From 2be6041d96967f8a8f0773d4cba2588b0852229f Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sun, 10 Jun 2018 21:29:58 -0700 Subject: [PATCH] rewrote manifest manager to use distinct committed and pending layers --- manifest/manifest_manager.go | 160 +++++++++++++++++------------- manifest/manifest_manager_test.go | 2 +- 2 files changed, 94 insertions(+), 68 deletions(-) diff --git a/manifest/manifest_manager.go b/manifest/manifest_manager.go index 165305ff0..afd27680d 100644 --- a/manifest/manifest_manager.go +++ b/manifest/manifest_manager.go @@ -25,11 +25,13 @@ // Manager organizes JSON manifests of various kinds, including snapshot manifests type Manager struct { - mu sync.Mutex - b *block.Manager - entries map[string]*manifestEntry - blockIDs []string - pendingEntries []*manifestEntry + mu sync.Mutex + b *block.Manager + + pendingEntries map[string]*manifestEntry + + committedEntries map[string]*manifestEntry + committedBlockIDs map[string]bool } // Put serializes the provided payload to JSON and persists it. Returns unique handle that represents the object. @@ -57,8 +59,7 @@ func (m *Manager) Put(labels map[string]string, payload interface{}) (string, er Content: b, } - m.pendingEntries = append(m.pendingEntries, e) - m.entries[e.ID] = e + m.pendingEntries[e.ID] = e return e.ID, nil } @@ -68,8 +69,12 @@ func (m *Manager) GetMetadata(id string) (*EntryMetadata, error) { m.mu.Lock() defer m.mu.Unlock() - e := m.entries[id] + e := m.pendingEntries[id] if e == nil { + e = m.committedEntries[id] + } + + if e == nil || e.Deleted { return nil, ErrNotFound } @@ -101,8 +106,11 @@ func (m *Manager) GetRaw(id string) ([]byte, error) { m.mu.Lock() defer m.mu.Unlock() - e := m.entries[id] + e := m.pendingEntries[id] if e == nil { + e = m.committedEntries[id] + } + if e == nil || e.Deleted { return nil, ErrNotFound } @@ -115,7 +123,17 @@ func (m *Manager) Find(labels map[string]string) []*EntryMetadata { defer m.mu.Unlock() var matches []*EntryMetadata - for _, e := range m.entries { + for _, e := range m.pendingEntries { + if matchesLabels(e.Labels, labels) { + matches = append(matches, cloneEntryMetadata(e)) + } + } + for _, e := range m.committedEntries { + if m.pendingEntries[e.ID] != nil { + // ignore committed that are also in pending + continue + } + if matchesLabels(e.Labels, labels) { matches = append(matches, cloneEntryMetadata(e)) } @@ -152,17 +170,8 @@ func (m *Manager) Flush(ctx context.Context) error { m.mu.Lock() defer m.mu.Unlock() - blockID, err := m.flushPendingEntriesLocked(ctx) - if err != nil { - return err - } - - if blockID == "" { - return nil - } - - m.blockIDs = append(m.blockIDs, blockID) - return nil + _, err := m.flushPendingEntriesLocked(ctx) + return err } func (m *Manager) flushPendingEntriesLocked(ctx context.Context) (string, error) { @@ -170,8 +179,10 @@ func (m *Manager) flushPendingEntriesLocked(ctx context.Context) (string, error) return "", nil } - man := manifest{ - Entries: m.pendingEntries, + man := manifest{} + + for _, e := range m.pendingEntries { + man.Entries = append(man.Entries, e) } var buf bytes.Buffer @@ -192,55 +203,50 @@ func (m *Manager) flushPendingEntriesLocked(ctx context.Context) (string, error) return "", err } - m.pendingEntries = nil + for _, e := range m.pendingEntries { + m.committedEntries[e.ID] = e + delete(m.pendingEntries, e.ID) + } + + m.committedBlockIDs[blockID] = true + return blockID, nil } // Delete marks the specified manifest ID for deletion. func (m *Manager) Delete(id string) { - if m.entries[id] == nil { + if m.pendingEntries[id] == nil && m.committedEntries[id] == nil { return } - delete(m.entries, id) - m.pendingEntries = append(m.pendingEntries, &manifestEntry{ + m.pendingEntries[id] = &manifestEntry{ ID: id, ModTime: time.Now().UTC(), Deleted: true, - }) + } } -func (m *Manager) load(ctx context.Context) error { - if err := m.Flush(ctx); err != nil { - return err - } - - m.mu.Lock() - defer m.mu.Unlock() - - m.entries = map[string]*manifestEntry{} - +func (m *Manager) loadCommittedBlocks(ctx context.Context) error { log.Debug().Msg("listing manifest blocks") blocks, err := m.b.ListBlocks(manifestBlockPrefix) if err != nil { return fmt.Errorf("unable to list manifest blocks: %v", err) } + m.mu.Lock() + defer m.mu.Unlock() + + m.committedEntries = map[string]*manifestEntry{} + m.committedBlockIDs = map[string]bool{} + log.Printf("found %v manifest blocks", len(blocks)) if err := m.loadManifestBlocks(ctx, blocks); err != nil { return fmt.Errorf("unable to load manifest blocks: %v", err) } - if len(blocks) > autoCompactionBlockCount { - log.Debug().Int("blocks", len(blocks)).Msg("performing automatic compaction") - if err := m.compactLocked(ctx); err != nil { - return fmt.Errorf("unable to compact manifest blocks: %v", err) - } - - if err := m.b.Flush(ctx); err != nil { - return fmt.Errorf("unable to flush blocks after auto-compaction: %v", err) - } + if err := m.maybeCompactLocked(ctx); err != nil { + return fmt.Errorf("error auto-compacting blocks") } return nil @@ -249,7 +255,9 @@ func (m *Manager) load(ctx context.Context) error { func (m *Manager) loadManifestBlocks(ctx context.Context, blockIDs []string) error { t0 := time.Now() - m.blockIDs = append(m.blockIDs, blockIDs...) + for _, b := range blockIDs { + m.committedBlockIDs[b] = true + } manifests, err := m.loadBlocksInParallel(ctx, blockIDs) if err != nil { @@ -263,11 +271,12 @@ func (m *Manager) loadManifestBlocks(ctx context.Context, blockIDs []string) err } // after merging, remove blocks marked as deleted. - for k, e := range m.entries { + for k, e := range m.committedEntries { if e.Deleted { - delete(m.entries, k) + delete(m.committedEntries, k) } } + log.Debug().Dur("duration_ms", time.Since(t0)).Msgf("finished loading manifest blocks.") return nil @@ -355,15 +364,32 @@ func (m *Manager) Compact(ctx context.Context) error { return m.compactLocked(ctx) } -func (m *Manager) compactLocked(ctx context.Context) error { - log.Printf("compactLocked: pendingEntries=%v blockIDs=%v", len(m.pendingEntries), len(m.blockIDs)) - - if len(m.blockIDs) == 1 && len(m.pendingEntries) == 0 { +func (m *Manager) maybeCompactLocked(ctx context.Context) error { + if len(m.committedBlockIDs) < autoCompactionBlockCount { return nil } - for _, e := range m.entries { - m.pendingEntries = append(m.pendingEntries, e) + log.Debug().Int("blocks", len(m.committedBlockIDs)).Msg("performing automatic compaction") + if err := m.compactLocked(ctx); err != nil { + return fmt.Errorf("unable to compact manifest blocks: %v", err) + } + + if err := m.b.Flush(ctx); err != nil { + return fmt.Errorf("unable to flush blocks after auto-compaction: %v", err) + } + + return nil +} + +func (m *Manager) compactLocked(ctx context.Context) error { + log.Printf("compactLocked: pendingEntries=%v blockIDs=%v", len(m.pendingEntries), len(m.committedBlockIDs)) + + if len(m.committedBlockIDs) == 1 && len(m.pendingEntries) == 0 { + return nil + } + + for _, e := range m.committedEntries { + m.pendingEntries[e.ID] = e } blockID, err := m.flushPendingEntriesLocked(ctx) @@ -372,9 +398,7 @@ func (m *Manager) compactLocked(ctx context.Context) error { } // add the newly-created block to the list, could be duplicate - m.blockIDs = append(m.blockIDs, blockID) - - for _, b := range m.blockIDs { + for b := range m.committedBlockIDs { if b == blockID { // do not delete block that was just written. continue @@ -383,22 +407,22 @@ func (m *Manager) compactLocked(ctx context.Context) error { if err := m.b.DeleteBlock(b); err != nil { return fmt.Errorf("unable to delete block %q: %v", b, err) } + + delete(m.committedBlockIDs, b) } - // all previous blocks were deleted, now we have a new block - m.blockIDs = []string{blockID} return nil } func (m *Manager) mergeEntry(e *manifestEntry) { - prev := m.entries[e.ID] + prev := m.committedEntries[e.ID] if prev == nil { - m.entries[e.ID] = e + m.committedEntries[e.ID] = e return } if e.ModTime.After(prev.ModTime) { - m.entries[e.ID] = e + m.committedEntries[e.ID] = e } } @@ -413,11 +437,13 @@ func copyLabels(m map[string]string) map[string]string { // NewManager returns new manifest manager for the provided block manager. func NewManager(ctx context.Context, b *block.Manager) (*Manager, error) { m := &Manager{ - b: b, - entries: map[string]*manifestEntry{}, + b: b, + pendingEntries: map[string]*manifestEntry{}, + committedEntries: map[string]*manifestEntry{}, + committedBlockIDs: map[string]bool{}, } - if err := m.load(ctx); err != nil { + if err := m.loadCommittedBlocks(ctx); err != nil { return nil, err } diff --git a/manifest/manifest_manager_test.go b/manifest/manifest_manager_test.go index 3968ace29..39519d086 100644 --- a/manifest/manifest_manager_test.go +++ b/manifest/manifest_manager_test.go @@ -92,7 +92,7 @@ func TestManifest(t *testing.T) { // still found in another verifyItem(t, mgr2, id3, labels3, item3) - if err := mgr2.load(ctx); err != nil { + if err := mgr2.loadCommittedBlocks(ctx); err != nil { t.Errorf("unable to load: %v", err) }