initial manifest manager, not used yet

This commit is contained in:
Jarek Kowalski
2017-11-08 18:58:41 -08:00
parent 48a88eee09
commit 2f25d8b9e8
2 changed files with 365 additions and 0 deletions

View File

@@ -0,0 +1,211 @@
package manifest
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/kopia/kopia/block"
)
// ErrNotFound is returned when the metadata item is not found.
var ErrNotFound = errors.New("not found")
const manifestGroupID = "manifests"
// Manager organizes JSON manifests of various kinds, including snapshot manifests
type Manager struct {
mu sync.Mutex
b *block.Manager
entries map[string]*Entry
pendingEntries []*Entry
}
func (m *Manager) Add(labels map[string]string, payload interface{}) (string, error) {
m.mu.Lock()
defer m.mu.Unlock()
random := make([]byte, 16)
rand.Read(random)
b, err := json.Marshal(payload)
if err != nil {
return "", err
}
e := &Entry{
ID: hex.EncodeToString(random),
ModTime: time.Now().UTC(),
Labels: copyLabels(labels),
Content: b,
}
m.pendingEntries = append(m.pendingEntries, e)
m.entries[e.ID] = e
return e.ID, nil
}
func (m *Manager) Get(id string, data interface{}) (map[string]string, error) {
m.mu.Lock()
defer m.mu.Unlock()
e := m.entries[id]
if e == nil {
return nil, ErrNotFound
}
if err := json.Unmarshal(e.Content, data); err != nil {
return nil, fmt.Errorf("unable to unmashal %q: %v", id, err)
}
return copyLabels(e.Labels), nil
}
func (m *Manager) Find(labels map[string]string) []string {
m.mu.Lock()
defer m.mu.Unlock()
var matches []string
for id, e := range m.entries {
if matchesLabels(e.Labels, labels) {
matches = append(matches, id)
}
}
sort.Slice(matches, func(i, j int) bool {
return m.entries[matches[i]].ModTime.Before(m.entries[matches[j]].ModTime)
})
return matches
}
// matchesLabels returns true when all entries in 'b' are found in the 'a'.
func matchesLabels(a, b map[string]string) bool {
for k, v := range b {
if a[k] != v {
return false
}
}
return true
}
func (m *Manager) Flush() error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.pendingEntries) == 0 {
return nil
}
man := Manifest{
Entries: m.pendingEntries,
}
b, err := json.Marshal(man)
if err != nil {
return fmt.Errorf("unable to marshal: %v", err)
}
if _, err := m.b.WriteBlock(manifestGroupID, b); err != nil {
return err
}
if err := m.b.Flush(); err != nil {
return err
}
m.pendingEntries = nil
return nil
}
func (m *Manager) Delete(id string) {
if m.entries[id] == nil {
return
}
delete(m.entries, id)
m.pendingEntries = append(m.pendingEntries, &Entry{
ID: id,
ModTime: time.Now().UTC(),
Deleted: true,
})
}
func (m *Manager) Load() error {
if err := m.Flush(); err != nil {
return err
}
m.entries = map[string]*Entry{}
for _, i := range m.b.ListGroupBlocks(manifestGroupID) {
blk, err := m.b.GetBlock(i.BlockID)
if err != nil {
return fmt.Errorf("unable to read block %q: %v", i.BlockID, err)
}
var man Manifest
if err := json.Unmarshal(blk, &man); err != nil {
return fmt.Errorf("unable to parse block %q: %v", i.BlockID, err)
}
for _, e := range man.Entries {
m.mergeEntry(e)
}
}
return nil
}
func (m *Manager) mergeEntry(e *Entry) error {
prev := m.entries[e.ID]
if prev == nil {
m.entries[e.ID] = e
return nil
}
if e.ModTime.After(prev.ModTime) {
m.entries[e.ID] = e
}
return nil
}
func copyLabels(m map[string]string) map[string]string {
r := map[string]string{}
for k, v := range m {
r[k] = v
}
return r
}
type Entry struct {
ID string `json:"id"`
Labels map[string]string `json:"labels"`
ModTime time.Time `json:"modified"`
Deleted bool `json:"deleted,omitempty"`
Content json.RawMessage `json:"data"`
}
type Manifest struct {
Entries []*Entry `json:"entries"`
}
func NewManager(b *block.Manager) (*Manager, error) {
m := &Manager{
b: b,
entries: map[string]*Entry{},
}
if err := m.Load(); err != nil {
return nil, err
}
return m, nil
}

View File

@@ -0,0 +1,154 @@
package manifest
import (
"reflect"
"sort"
"testing"
"github.com/kopia/kopia/internal/storagetesting"
"github.com/kopia/kopia/block"
)
func TestManifest(t *testing.T) {
data := map[string][]byte{}
mgr, err := newManaferForTesting(t, data)
if err != nil {
t.Fatalf("unable to open block manager: %v", mgr)
}
item1 := map[string]int{"foo": 1, "bar": 2}
item2 := map[string]int{"foo": 2, "bar": 3}
item3 := map[string]int{"foo": 3, "bar": 4}
labels1 := map[string]string{"color": "red"}
labels2 := map[string]string{"color": "blue", "shape": "square"}
labels3 := map[string]string{"shape": "square", "color": "red"}
id1 := addAndVerify(t, mgr, labels1, item1)
id2 := addAndVerify(t, mgr, labels2, item2)
id3 := addAndVerify(t, mgr, labels3, item3)
cases := []struct {
criteria map[string]string
expected []string
}{
{map[string]string{"color": "red"}, []string{id1, id3}},
{map[string]string{"color": "blue"}, []string{id2}},
{map[string]string{"color": "green"}, nil},
{map[string]string{"color": "red", "shape": "square"}, []string{id3}},
{map[string]string{"color": "blue", "shape": "square"}, []string{id2}},
{map[string]string{"color": "red", "shape": "circle"}, nil},
}
// verify before flush
for _, tc := range cases {
verifyMatches(t, mgr, tc.criteria, tc.expected)
}
verifyItem(t, mgr, id1, labels1, item1)
verifyItem(t, mgr, id2, labels2, item2)
verifyItem(t, mgr, id3, labels3, item3)
if err := mgr.Flush(); err != nil {
t.Errorf("flush error: %v", err)
}
if err := mgr.Flush(); err != nil {
t.Errorf("flush error: %v", err)
}
// verify after flush
for _, tc := range cases {
verifyMatches(t, mgr, tc.criteria, tc.expected)
}
verifyItem(t, mgr, id1, labels1, item1)
verifyItem(t, mgr, id2, labels2, item2)
verifyItem(t, mgr, id3, labels3, item3)
// verify in new manager
mgr2, err := newManaferForTesting(t, data)
if err != nil {
t.Fatalf("can't open block manager: %v", err)
}
for _, tc := range cases {
verifyMatches(t, mgr2, tc.criteria, tc.expected)
}
verifyItem(t, mgr2, id1, labels1, item1)
verifyItem(t, mgr2, id2, labels2, item2)
verifyItem(t, mgr2, id3, labels3, item3)
if err := mgr2.Flush(); err != nil {
t.Errorf("flush error: %v", err)
}
// delete from one
mgr.Delete(id3)
verifyItemNotFound(t, mgr, id3)
mgr.Flush()
verifyItemNotFound(t, mgr, id3)
// still found in another
verifyItem(t, mgr2, id3, labels3, item3)
if err := mgr2.Load(); err != nil {
t.Errorf("unable to load: %v", err)
}
}
func addAndVerify(t *testing.T, mgr *Manager, labels map[string]string, data map[string]int) string {
t.Helper()
id, err := mgr.Add(labels, data)
if err != nil {
t.Errorf("unable to add %v (%v): %v", labels, data, err)
return ""
}
verifyItem(t, mgr, id, labels, data)
return id
}
func verifyItem(t *testing.T, mgr *Manager, id string, labels map[string]string, data map[string]int) {
t.Helper()
var retrieved map[string]int
l, err := mgr.Get(id, &retrieved)
if err != nil {
t.Errorf("unable to retrieve %q: %v", id, err)
return
}
if !reflect.DeepEqual(l, labels) {
t.Errorf("invalid labels retrieved %v, wanted %v", l, labels)
}
}
func verifyItemNotFound(t *testing.T, mgr *Manager, id string) {
t.Helper()
var retrieved map[string]int
_, err := mgr.Get(id, &retrieved)
if got, want := err, ErrNotFound; got != want {
t.Errorf("invalid error when getting %q %v, expected %v", id, err, ErrNotFound)
return
}
}
func verifyMatches(t *testing.T, mgr *Manager, labels map[string]string, expected []string) {
t.Helper()
matches := mgr.Find(labels)
sort.Strings(matches)
sort.Strings(expected)
if !reflect.DeepEqual(matches, expected) {
t.Errorf("invalid matches for %v: %v, expected %v", labels, matches, expected)
}
}
func newManaferForTesting(t *testing.T, data map[string][]byte) (*Manager, error) {
formatter, err := block.FormatterFactories["TESTONLY_MD5"](block.FormattingOptions{})
if err != nil {
panic("can't create formatter")
}
st := storagetesting.NewMapStorage(data)
return NewManager(block.NewManager(st, 10000, 100000, formatter))
}