From 2f25d8b9e894ddfba871d246760a3dd80d18d72c Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Wed, 8 Nov 2017 18:58:41 -0800 Subject: [PATCH] initial manifest manager, not used yet --- manifest/manifest_manager.go | 211 ++++++++++++++++++++++++++++++ manifest/manifest_manager_test.go | 154 ++++++++++++++++++++++ 2 files changed, 365 insertions(+) create mode 100644 manifest/manifest_manager.go create mode 100644 manifest/manifest_manager_test.go diff --git a/manifest/manifest_manager.go b/manifest/manifest_manager.go new file mode 100644 index 000000000..fd43cb281 --- /dev/null +++ b/manifest/manifest_manager.go @@ -0,0 +1,211 @@ +package manifest + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "sort" + "sync" + "time" + + "github.com/kopia/kopia/block" +) + +// ErrNotFound is returned when the metadata item is not found. +var ErrNotFound = errors.New("not found") + +const manifestGroupID = "manifests" + +// Manager organizes JSON manifests of various kinds, including snapshot manifests +type Manager struct { + mu sync.Mutex + b *block.Manager + entries map[string]*Entry + pendingEntries []*Entry +} + +func (m *Manager) Add(labels map[string]string, payload interface{}) (string, error) { + m.mu.Lock() + defer m.mu.Unlock() + + random := make([]byte, 16) + rand.Read(random) + + b, err := json.Marshal(payload) + if err != nil { + return "", err + } + + e := &Entry{ + ID: hex.EncodeToString(random), + ModTime: time.Now().UTC(), + Labels: copyLabels(labels), + Content: b, + } + + m.pendingEntries = append(m.pendingEntries, e) + m.entries[e.ID] = e + + return e.ID, nil +} + +func (m *Manager) Get(id string, data interface{}) (map[string]string, error) { + m.mu.Lock() + defer m.mu.Unlock() + + e := m.entries[id] + if e == nil { + return nil, ErrNotFound + } + + if err := json.Unmarshal(e.Content, data); err != nil { + return nil, fmt.Errorf("unable to unmashal %q: %v", id, err) + } + + return copyLabels(e.Labels), nil +} + +func (m *Manager) Find(labels map[string]string) []string { + m.mu.Lock() + defer m.mu.Unlock() + + var matches []string + for id, e := range m.entries { + if matchesLabels(e.Labels, labels) { + matches = append(matches, id) + } + } + + sort.Slice(matches, func(i, j int) bool { + return m.entries[matches[i]].ModTime.Before(m.entries[matches[j]].ModTime) + }) + return matches +} + +// matchesLabels returns true when all entries in 'b' are found in the 'a'. +func matchesLabels(a, b map[string]string) bool { + for k, v := range b { + if a[k] != v { + return false + } + } + + return true +} + +func (m *Manager) Flush() error { + m.mu.Lock() + defer m.mu.Unlock() + + if len(m.pendingEntries) == 0 { + return nil + } + + man := Manifest{ + Entries: m.pendingEntries, + } + + b, err := json.Marshal(man) + if err != nil { + return fmt.Errorf("unable to marshal: %v", err) + } + + if _, err := m.b.WriteBlock(manifestGroupID, b); err != nil { + return err + } + + if err := m.b.Flush(); err != nil { + return err + } + + m.pendingEntries = nil + return nil +} + +func (m *Manager) Delete(id string) { + if m.entries[id] == nil { + return + } + + delete(m.entries, id) + m.pendingEntries = append(m.pendingEntries, &Entry{ + ID: id, + ModTime: time.Now().UTC(), + Deleted: true, + }) +} + +func (m *Manager) Load() error { + if err := m.Flush(); err != nil { + return err + } + + m.entries = map[string]*Entry{} + + for _, i := range m.b.ListGroupBlocks(manifestGroupID) { + blk, err := m.b.GetBlock(i.BlockID) + if err != nil { + return fmt.Errorf("unable to read block %q: %v", i.BlockID, err) + } + + var man Manifest + if err := json.Unmarshal(blk, &man); err != nil { + return fmt.Errorf("unable to parse block %q: %v", i.BlockID, err) + } + + for _, e := range man.Entries { + m.mergeEntry(e) + } + } + + return nil +} + +func (m *Manager) mergeEntry(e *Entry) error { + prev := m.entries[e.ID] + if prev == nil { + m.entries[e.ID] = e + return nil + } + + if e.ModTime.After(prev.ModTime) { + m.entries[e.ID] = e + } + + return nil +} + +func copyLabels(m map[string]string) map[string]string { + r := map[string]string{} + for k, v := range m { + r[k] = v + } + return r +} + +type Entry struct { + ID string `json:"id"` + Labels map[string]string `json:"labels"` + ModTime time.Time `json:"modified"` + Deleted bool `json:"deleted,omitempty"` + Content json.RawMessage `json:"data"` +} + +type Manifest struct { + Entries []*Entry `json:"entries"` +} + +func NewManager(b *block.Manager) (*Manager, error) { + m := &Manager{ + b: b, + entries: map[string]*Entry{}, + } + + if err := m.Load(); err != nil { + return nil, err + } + + return m, nil +} diff --git a/manifest/manifest_manager_test.go b/manifest/manifest_manager_test.go new file mode 100644 index 000000000..359b7cc9b --- /dev/null +++ b/manifest/manifest_manager_test.go @@ -0,0 +1,154 @@ +package manifest + +import ( + "reflect" + "sort" + "testing" + + "github.com/kopia/kopia/internal/storagetesting" + + "github.com/kopia/kopia/block" +) + +func TestManifest(t *testing.T) { + data := map[string][]byte{} + + mgr, err := newManaferForTesting(t, data) + if err != nil { + t.Fatalf("unable to open block manager: %v", mgr) + } + + item1 := map[string]int{"foo": 1, "bar": 2} + item2 := map[string]int{"foo": 2, "bar": 3} + item3 := map[string]int{"foo": 3, "bar": 4} + + labels1 := map[string]string{"color": "red"} + labels2 := map[string]string{"color": "blue", "shape": "square"} + labels3 := map[string]string{"shape": "square", "color": "red"} + + id1 := addAndVerify(t, mgr, labels1, item1) + id2 := addAndVerify(t, mgr, labels2, item2) + id3 := addAndVerify(t, mgr, labels3, item3) + + cases := []struct { + criteria map[string]string + expected []string + }{ + {map[string]string{"color": "red"}, []string{id1, id3}}, + {map[string]string{"color": "blue"}, []string{id2}}, + {map[string]string{"color": "green"}, nil}, + {map[string]string{"color": "red", "shape": "square"}, []string{id3}}, + {map[string]string{"color": "blue", "shape": "square"}, []string{id2}}, + {map[string]string{"color": "red", "shape": "circle"}, nil}, + } + + // verify before flush + for _, tc := range cases { + verifyMatches(t, mgr, tc.criteria, tc.expected) + } + verifyItem(t, mgr, id1, labels1, item1) + verifyItem(t, mgr, id2, labels2, item2) + verifyItem(t, mgr, id3, labels3, item3) + + if err := mgr.Flush(); err != nil { + t.Errorf("flush error: %v", err) + } + if err := mgr.Flush(); err != nil { + t.Errorf("flush error: %v", err) + } + + // verify after flush + for _, tc := range cases { + verifyMatches(t, mgr, tc.criteria, tc.expected) + } + verifyItem(t, mgr, id1, labels1, item1) + verifyItem(t, mgr, id2, labels2, item2) + verifyItem(t, mgr, id3, labels3, item3) + + // verify in new manager + mgr2, err := newManaferForTesting(t, data) + if err != nil { + t.Fatalf("can't open block manager: %v", err) + } + for _, tc := range cases { + verifyMatches(t, mgr2, tc.criteria, tc.expected) + } + verifyItem(t, mgr2, id1, labels1, item1) + verifyItem(t, mgr2, id2, labels2, item2) + verifyItem(t, mgr2, id3, labels3, item3) + if err := mgr2.Flush(); err != nil { + t.Errorf("flush error: %v", err) + } + + // delete from one + mgr.Delete(id3) + verifyItemNotFound(t, mgr, id3) + mgr.Flush() + verifyItemNotFound(t, mgr, id3) + + // still found in another + verifyItem(t, mgr2, id3, labels3, item3) + if err := mgr2.Load(); err != nil { + t.Errorf("unable to load: %v", err) + } +} + +func addAndVerify(t *testing.T, mgr *Manager, labels map[string]string, data map[string]int) string { + t.Helper() + id, err := mgr.Add(labels, data) + if err != nil { + t.Errorf("unable to add %v (%v): %v", labels, data, err) + return "" + } + + verifyItem(t, mgr, id, labels, data) + return id +} + +func verifyItem(t *testing.T, mgr *Manager, id string, labels map[string]string, data map[string]int) { + t.Helper() + var retrieved map[string]int + + l, err := mgr.Get(id, &retrieved) + if err != nil { + t.Errorf("unable to retrieve %q: %v", id, err) + return + } + + if !reflect.DeepEqual(l, labels) { + t.Errorf("invalid labels retrieved %v, wanted %v", l, labels) + } +} + +func verifyItemNotFound(t *testing.T, mgr *Manager, id string) { + t.Helper() + var retrieved map[string]int + + _, err := mgr.Get(id, &retrieved) + if got, want := err, ErrNotFound; got != want { + t.Errorf("invalid error when getting %q %v, expected %v", id, err, ErrNotFound) + return + } +} + +func verifyMatches(t *testing.T, mgr *Manager, labels map[string]string, expected []string) { + t.Helper() + + matches := mgr.Find(labels) + sort.Strings(matches) + sort.Strings(expected) + + if !reflect.DeepEqual(matches, expected) { + t.Errorf("invalid matches for %v: %v, expected %v", labels, matches, expected) + } +} + +func newManaferForTesting(t *testing.T, data map[string][]byte) (*Manager, error) { + formatter, err := block.FormatterFactories["TESTONLY_MD5"](block.FormattingOptions{}) + if err != nil { + panic("can't create formatter") + } + st := storagetesting.NewMapStorage(data) + + return NewManager(block.NewManager(st, 10000, 100000, formatter)) +}