From df3129682c4f78cdabcfc69f060d62a8c4ffa86a Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sun, 10 Jun 2018 21:42:22 -0700 Subject: [PATCH] Added repo.Repository.Refresh{,Periodically} --- block/block_manager.go | 47 +++++++++++++++------------------- block/block_manager_test.go | 2 +- block/committed_block_index.go | 10 ++++++-- internal/server/api_error.go | 4 --- manifest/manifest_manager.go | 5 ++++ repo/repository.go | 32 ++++++++++++++++++----- 6 files changed, 59 insertions(+), 41 deletions(-) diff --git a/block/block_manager.go b/block/block_manager.go index d522f5b2e..22aa44b85 100644 --- a/block/block_manager.go +++ b/block/block_manager.go @@ -10,7 +10,6 @@ "io" "math/rand" "os" - "path/filepath" "reflect" "sort" "strings" @@ -36,8 +35,6 @@ autoCompactionMinBlockCount = 4 * parallelFetches autoCompactionMaxBlockCount = 64 - defaultActiveBlocksExtraTime = 10 * time.Minute - currentWriteVersion = 1 minSupportedReadVersion = 0 maxSupportedReadVersion = currentWriteVersion @@ -74,7 +71,6 @@ type Manager struct { committedBlocks committedBlockIndex flushPackIndexesAfter time.Time // time when those indexes should be flushed - activeBlocksExtraTime time.Duration closed chan struct{} @@ -1029,21 +1025,17 @@ func listIndexBlocksFromStorage(ctx context.Context, st storage.Storage) ([]Inde // NewManager creates new block manager with given packing options and a formatter. func NewManager(ctx context.Context, st storage.Storage, f FormattingOptions, caching CachingOptions) (*Manager, error) { - return newManagerWithOptions(ctx, st, f, caching, time.Now, defaultActiveBlocksExtraTime) + return newManagerWithOptions(ctx, st, f, caching, time.Now) } -func newManagerWithOptions(ctx context.Context, st storage.Storage, f FormattingOptions, caching CachingOptions, timeNow func() time.Time, activeBlocksExtraTime time.Duration) (*Manager, error) { +func newManagerWithOptions(ctx context.Context, st storage.Storage, f FormattingOptions, caching CachingOptions, timeNow func() time.Time) (*Manager, error) { if f.Version < minSupportedReadVersion || f.Version > currentWriteVersion { return nil, fmt.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedReadVersion, maxSupportedReadVersion) } - sf := FormatterFactories[f.BlockFormat] - if sf == nil { - return nil, fmt.Errorf("unsupported block format: %v", f.BlockFormat) - } - formatter, err := sf(f) + formatter, err := createFormatter(f) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to create block formatter: %v", err) } blockCache, err := newBlockCache(ctx, st, caching) @@ -1056,14 +1048,9 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting return nil, fmt.Errorf("unable to initialize list cache: %v", err) } - var cbi committedBlockIndex - if caching.CacheDirectory != "" { - cbi, err = newSimpleCommittedBlockIndex(filepath.Join(caching.CacheDirectory, "indexes")) - if err != nil { - return nil, fmt.Errorf("unable to initialize block index cache: %v", err) - } - } else { - cbi = newCommittedBlockIndex() + blockIndex, err := newCommittedBlockIndex(caching) + if err != nil { + return nil, fmt.Errorf("unable to initialize committed block index: %v", err) } m := &Manager{ @@ -1074,20 +1061,17 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting formatter: formatter, currentPackItems: make(map[string]Info), packIndexBuilder: packindex.NewBuilder(), - committedBlocks: cbi, + committedBlocks: blockIndex, minPreambleLength: defaultMinPreambleLength, maxPreambleLength: defaultMaxPreambleLength, paddingUnit: defaultPaddingUnit, blockCache: blockCache, listCache: listCache, st: st, - activeBlocksExtraTime: activeBlocksExtraTime, - writeFormatVersion: int32(f.Version), - closed: make(chan struct{}), - } - if os.Getenv("KOPIA_VERIFY_INVARIANTS") != "" { - m.checkInvariantsOnUnlock = true + writeFormatVersion: int32(f.Version), + closed: make(chan struct{}), + checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "", } m.startPackIndexLocked() @@ -1098,3 +1082,12 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting return m, nil } + +func createFormatter(f FormattingOptions) (Formatter, error) { + sf := FormatterFactories[f.BlockFormat] + if sf == nil { + return nil, fmt.Errorf("unsupported block format: %v", f.BlockFormat) + } + + return sf(f) +} diff --git a/block/block_manager_test.go b/block/block_manager_test.go index 84530842b..900baec2d 100644 --- a/block/block_manager_test.go +++ b/block/block_manager_test.go @@ -588,7 +588,7 @@ func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time, t bm, err := newManagerWithOptions(context.Background(), st, FormattingOptions{ BlockFormat: "TESTONLY_MD5", MaxPackSize: maxPackSize, - }, CachingOptions{}, timeFunc, 0) + }, CachingOptions{}, timeFunc) if err != nil { panic("can't create block manager: " + err.Error()) } diff --git a/block/committed_block_index.go b/block/committed_block_index.go index 133794de5..3eca89b3e 100644 --- a/block/committed_block_index.go +++ b/block/committed_block_index.go @@ -1,6 +1,8 @@ package block import ( + "path/filepath" + "github.com/kopia/kopia/internal/packindex" ) @@ -14,9 +16,13 @@ type committedBlockIndex interface { use(indexBlockIDs []string) (bool, error) } -func newCommittedBlockIndex() committedBlockIndex { +func newCommittedBlockIndex(caching CachingOptions) (committedBlockIndex, error) { + if caching.CacheDirectory != "" { + return newSimpleCommittedBlockIndex(filepath.Join(caching.CacheDirectory, "indexes")) + } + return &inMemoryCommittedBlockIndex{ cachedPhysicalBlocks: make(map[string]packindex.Index), usedPhysicalBlocks: make(map[string]packindex.Index), - } + }, nil } diff --git a/internal/server/api_error.go b/internal/server/api_error.go index 7817ba4d7..5889e4776 100644 --- a/internal/server/api_error.go +++ b/internal/server/api_error.go @@ -9,10 +9,6 @@ type apiError struct { message string } -func malformedRequestError() *apiError { - return &apiError{400, "malformed request"} -} - func internalServerError(err error) *apiError { return &apiError{500, fmt.Sprintf("internal server error: %v", err)} } diff --git a/manifest/manifest_manager.go b/manifest/manifest_manager.go index afd27680d..1598c5351 100644 --- a/manifest/manifest_manager.go +++ b/manifest/manifest_manager.go @@ -226,6 +226,11 @@ func (m *Manager) Delete(id string) { } } +// Refresh updates the committed blocks from the underlying storage. +func (m *Manager) Refresh(ctx context.Context) error { + return m.loadCommittedBlocks(ctx) +} + func (m *Manager) loadCommittedBlocks(ctx context.Context) error { log.Debug().Msg("listing manifest blocks") blocks, err := m.b.ListBlocks(manifestBlockPrefix) diff --git a/repo/repository.go b/repo/repository.go index 728af0b68..3f9129ad7 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -2,6 +2,7 @@ import ( "context" + "fmt" "time" "github.com/kopia/kopia/auth" @@ -54,6 +55,28 @@ func (r *Repository) Flush(ctx context.Context) error { return r.Blocks.Flush(ctx) } +// Refresh periodically makes external changes visible to repository. +func (r *Repository) Refresh(ctx context.Context) error { + updated, err := r.Blocks.Refresh(ctx) + if err != nil { + return fmt.Errorf("error refreshing block index: %v", err) + } + + if !updated { + return nil + } + + log.Printf("block index refreshed") + + if err := r.Manifests.Refresh(ctx); err != nil { + return fmt.Errorf("error reloading manifests: %v", err) + } + + log.Printf("manifests refreshed") + + return nil +} + // RefreshPeriodically periodically refreshes the repository to reflect the changes made by other hosts. func (r *Repository) RefreshPeriodically(ctx context.Context, interval time.Duration) { for { @@ -62,13 +85,8 @@ func (r *Repository) RefreshPeriodically(ctx context.Context, interval time.Dura return case <-time.After(interval): - if updated, err := r.Blocks.Refresh(ctx); err != nil { - log.Warn().Msgf("error reloading indexes: %v", err) - } else if updated { - log.Printf("reloaded indexes") - // if err := r.Manifests.Refresh(ctx); err != nil { - // log.Warn().Msgf("error reloading manifests: %v", err) - // } + if err := r.Refresh(ctx); err != nil { + log.Warn().Msgf("error refreshing repository: %v", err) } } }