rewrote manifest manager to use distinct committed and pending layers

This commit is contained in:
Jarek Kowalski
2018-06-10 21:29:58 -07:00
parent 22b0766570
commit 2be6041d96
2 changed files with 94 additions and 68 deletions

View File

@@ -25,11 +25,13 @@
// Manager organizes JSON manifests of various kinds, including snapshot manifests
type Manager struct {
mu sync.Mutex
b *block.Manager
entries map[string]*manifestEntry
blockIDs []string
pendingEntries []*manifestEntry
mu sync.Mutex
b *block.Manager
pendingEntries map[string]*manifestEntry
committedEntries map[string]*manifestEntry
committedBlockIDs map[string]bool
}
// Put serializes the provided payload to JSON and persists it. Returns unique handle that represents the object.
@@ -57,8 +59,7 @@ func (m *Manager) Put(labels map[string]string, payload interface{}) (string, er
Content: b,
}
m.pendingEntries = append(m.pendingEntries, e)
m.entries[e.ID] = e
m.pendingEntries[e.ID] = e
return e.ID, nil
}
@@ -68,8 +69,12 @@ func (m *Manager) GetMetadata(id string) (*EntryMetadata, error) {
m.mu.Lock()
defer m.mu.Unlock()
e := m.entries[id]
e := m.pendingEntries[id]
if e == nil {
e = m.committedEntries[id]
}
if e == nil || e.Deleted {
return nil, ErrNotFound
}
@@ -101,8 +106,11 @@ func (m *Manager) GetRaw(id string) ([]byte, error) {
m.mu.Lock()
defer m.mu.Unlock()
e := m.entries[id]
e := m.pendingEntries[id]
if e == nil {
e = m.committedEntries[id]
}
if e == nil || e.Deleted {
return nil, ErrNotFound
}
@@ -115,7 +123,17 @@ func (m *Manager) Find(labels map[string]string) []*EntryMetadata {
defer m.mu.Unlock()
var matches []*EntryMetadata
for _, e := range m.entries {
for _, e := range m.pendingEntries {
if matchesLabels(e.Labels, labels) {
matches = append(matches, cloneEntryMetadata(e))
}
}
for _, e := range m.committedEntries {
if m.pendingEntries[e.ID] != nil {
// ignore committed that are also in pending
continue
}
if matchesLabels(e.Labels, labels) {
matches = append(matches, cloneEntryMetadata(e))
}
@@ -152,17 +170,8 @@ func (m *Manager) Flush(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
blockID, err := m.flushPendingEntriesLocked(ctx)
if err != nil {
return err
}
if blockID == "" {
return nil
}
m.blockIDs = append(m.blockIDs, blockID)
return nil
_, err := m.flushPendingEntriesLocked(ctx)
return err
}
func (m *Manager) flushPendingEntriesLocked(ctx context.Context) (string, error) {
@@ -170,8 +179,10 @@ func (m *Manager) flushPendingEntriesLocked(ctx context.Context) (string, error)
return "", nil
}
man := manifest{
Entries: m.pendingEntries,
man := manifest{}
for _, e := range m.pendingEntries {
man.Entries = append(man.Entries, e)
}
var buf bytes.Buffer
@@ -192,55 +203,50 @@ func (m *Manager) flushPendingEntriesLocked(ctx context.Context) (string, error)
return "", err
}
m.pendingEntries = nil
for _, e := range m.pendingEntries {
m.committedEntries[e.ID] = e
delete(m.pendingEntries, e.ID)
}
m.committedBlockIDs[blockID] = true
return blockID, nil
}
// Delete marks the specified manifest ID for deletion.
func (m *Manager) Delete(id string) {
if m.entries[id] == nil {
if m.pendingEntries[id] == nil && m.committedEntries[id] == nil {
return
}
delete(m.entries, id)
m.pendingEntries = append(m.pendingEntries, &manifestEntry{
m.pendingEntries[id] = &manifestEntry{
ID: id,
ModTime: time.Now().UTC(),
Deleted: true,
})
}
}
func (m *Manager) load(ctx context.Context) error {
if err := m.Flush(ctx); err != nil {
return err
}
m.mu.Lock()
defer m.mu.Unlock()
m.entries = map[string]*manifestEntry{}
func (m *Manager) loadCommittedBlocks(ctx context.Context) error {
log.Debug().Msg("listing manifest blocks")
blocks, err := m.b.ListBlocks(manifestBlockPrefix)
if err != nil {
return fmt.Errorf("unable to list manifest blocks: %v", err)
}
m.mu.Lock()
defer m.mu.Unlock()
m.committedEntries = map[string]*manifestEntry{}
m.committedBlockIDs = map[string]bool{}
log.Printf("found %v manifest blocks", len(blocks))
if err := m.loadManifestBlocks(ctx, blocks); err != nil {
return fmt.Errorf("unable to load manifest blocks: %v", err)
}
if len(blocks) > autoCompactionBlockCount {
log.Debug().Int("blocks", len(blocks)).Msg("performing automatic compaction")
if err := m.compactLocked(ctx); err != nil {
return fmt.Errorf("unable to compact manifest blocks: %v", err)
}
if err := m.b.Flush(ctx); err != nil {
return fmt.Errorf("unable to flush blocks after auto-compaction: %v", err)
}
if err := m.maybeCompactLocked(ctx); err != nil {
return fmt.Errorf("error auto-compacting blocks")
}
return nil
@@ -249,7 +255,9 @@ func (m *Manager) load(ctx context.Context) error {
func (m *Manager) loadManifestBlocks(ctx context.Context, blockIDs []string) error {
t0 := time.Now()
m.blockIDs = append(m.blockIDs, blockIDs...)
for _, b := range blockIDs {
m.committedBlockIDs[b] = true
}
manifests, err := m.loadBlocksInParallel(ctx, blockIDs)
if err != nil {
@@ -263,11 +271,12 @@ func (m *Manager) loadManifestBlocks(ctx context.Context, blockIDs []string) err
}
// after merging, remove blocks marked as deleted.
for k, e := range m.entries {
for k, e := range m.committedEntries {
if e.Deleted {
delete(m.entries, k)
delete(m.committedEntries, k)
}
}
log.Debug().Dur("duration_ms", time.Since(t0)).Msgf("finished loading manifest blocks.")
return nil
@@ -355,15 +364,32 @@ func (m *Manager) Compact(ctx context.Context) error {
return m.compactLocked(ctx)
}
func (m *Manager) compactLocked(ctx context.Context) error {
log.Printf("compactLocked: pendingEntries=%v blockIDs=%v", len(m.pendingEntries), len(m.blockIDs))
if len(m.blockIDs) == 1 && len(m.pendingEntries) == 0 {
func (m *Manager) maybeCompactLocked(ctx context.Context) error {
if len(m.committedBlockIDs) < autoCompactionBlockCount {
return nil
}
for _, e := range m.entries {
m.pendingEntries = append(m.pendingEntries, e)
log.Debug().Int("blocks", len(m.committedBlockIDs)).Msg("performing automatic compaction")
if err := m.compactLocked(ctx); err != nil {
return fmt.Errorf("unable to compact manifest blocks: %v", err)
}
if err := m.b.Flush(ctx); err != nil {
return fmt.Errorf("unable to flush blocks after auto-compaction: %v", err)
}
return nil
}
func (m *Manager) compactLocked(ctx context.Context) error {
log.Printf("compactLocked: pendingEntries=%v blockIDs=%v", len(m.pendingEntries), len(m.committedBlockIDs))
if len(m.committedBlockIDs) == 1 && len(m.pendingEntries) == 0 {
return nil
}
for _, e := range m.committedEntries {
m.pendingEntries[e.ID] = e
}
blockID, err := m.flushPendingEntriesLocked(ctx)
@@ -372,9 +398,7 @@ func (m *Manager) compactLocked(ctx context.Context) error {
}
// add the newly-created block to the list, could be duplicate
m.blockIDs = append(m.blockIDs, blockID)
for _, b := range m.blockIDs {
for b := range m.committedBlockIDs {
if b == blockID {
// do not delete block that was just written.
continue
@@ -383,22 +407,22 @@ func (m *Manager) compactLocked(ctx context.Context) error {
if err := m.b.DeleteBlock(b); err != nil {
return fmt.Errorf("unable to delete block %q: %v", b, err)
}
delete(m.committedBlockIDs, b)
}
// all previous blocks were deleted, now we have a new block
m.blockIDs = []string{blockID}
return nil
}
func (m *Manager) mergeEntry(e *manifestEntry) {
prev := m.entries[e.ID]
prev := m.committedEntries[e.ID]
if prev == nil {
m.entries[e.ID] = e
m.committedEntries[e.ID] = e
return
}
if e.ModTime.After(prev.ModTime) {
m.entries[e.ID] = e
m.committedEntries[e.ID] = e
}
}
@@ -413,11 +437,13 @@ func copyLabels(m map[string]string) map[string]string {
// NewManager returns new manifest manager for the provided block manager.
func NewManager(ctx context.Context, b *block.Manager) (*Manager, error) {
m := &Manager{
b: b,
entries: map[string]*manifestEntry{},
b: b,
pendingEntries: map[string]*manifestEntry{},
committedEntries: map[string]*manifestEntry{},
committedBlockIDs: map[string]bool{},
}
if err := m.load(ctx); err != nil {
if err := m.loadCommittedBlocks(ctx); err != nil {
return nil, err
}

View File

@@ -92,7 +92,7 @@ func TestManifest(t *testing.T) {
// still found in another
verifyItem(t, mgr2, id3, labels3, item3)
if err := mgr2.load(ctx); err != nil {
if err := mgr2.loadCommittedBlocks(ctx); err != nil {
t.Errorf("unable to load: %v", err)
}