Added repo.Repository.Refresh{,Periodically}

This commit is contained in:
Jarek Kowalski
2018-06-10 21:42:22 -07:00
parent 2be6041d96
commit df3129682c
6 changed files with 59 additions and 41 deletions

View File

@@ -10,7 +10,6 @@
"io"
"math/rand"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
@@ -36,8 +35,6 @@
autoCompactionMinBlockCount = 4 * parallelFetches
autoCompactionMaxBlockCount = 64
defaultActiveBlocksExtraTime = 10 * time.Minute
currentWriteVersion = 1
minSupportedReadVersion = 0
maxSupportedReadVersion = currentWriteVersion
@@ -74,7 +71,6 @@ type Manager struct {
committedBlocks committedBlockIndex
flushPackIndexesAfter time.Time // time when those indexes should be flushed
activeBlocksExtraTime time.Duration
closed chan struct{}
@@ -1029,21 +1025,17 @@ func listIndexBlocksFromStorage(ctx context.Context, st storage.Storage) ([]Inde
// NewManager creates new block manager with given packing options and a formatter.
func NewManager(ctx context.Context, st storage.Storage, f FormattingOptions, caching CachingOptions) (*Manager, error) {
return newManagerWithOptions(ctx, st, f, caching, time.Now, defaultActiveBlocksExtraTime)
return newManagerWithOptions(ctx, st, f, caching, time.Now)
}
func newManagerWithOptions(ctx context.Context, st storage.Storage, f FormattingOptions, caching CachingOptions, timeNow func() time.Time, activeBlocksExtraTime time.Duration) (*Manager, error) {
func newManagerWithOptions(ctx context.Context, st storage.Storage, f FormattingOptions, caching CachingOptions, timeNow func() time.Time) (*Manager, error) {
if f.Version < minSupportedReadVersion || f.Version > currentWriteVersion {
return nil, fmt.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedReadVersion, maxSupportedReadVersion)
}
sf := FormatterFactories[f.BlockFormat]
if sf == nil {
return nil, fmt.Errorf("unsupported block format: %v", f.BlockFormat)
}
formatter, err := sf(f)
formatter, err := createFormatter(f)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to create block formatter: %v", err)
}
blockCache, err := newBlockCache(ctx, st, caching)
@@ -1056,14 +1048,9 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting
return nil, fmt.Errorf("unable to initialize list cache: %v", err)
}
var cbi committedBlockIndex
if caching.CacheDirectory != "" {
cbi, err = newSimpleCommittedBlockIndex(filepath.Join(caching.CacheDirectory, "indexes"))
if err != nil {
return nil, fmt.Errorf("unable to initialize block index cache: %v", err)
}
} else {
cbi = newCommittedBlockIndex()
blockIndex, err := newCommittedBlockIndex(caching)
if err != nil {
return nil, fmt.Errorf("unable to initialize committed block index: %v", err)
}
m := &Manager{
@@ -1074,20 +1061,17 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting
formatter: formatter,
currentPackItems: make(map[string]Info),
packIndexBuilder: packindex.NewBuilder(),
committedBlocks: cbi,
committedBlocks: blockIndex,
minPreambleLength: defaultMinPreambleLength,
maxPreambleLength: defaultMaxPreambleLength,
paddingUnit: defaultPaddingUnit,
blockCache: blockCache,
listCache: listCache,
st: st,
activeBlocksExtraTime: activeBlocksExtraTime,
writeFormatVersion: int32(f.Version),
closed: make(chan struct{}),
}
if os.Getenv("KOPIA_VERIFY_INVARIANTS") != "" {
m.checkInvariantsOnUnlock = true
writeFormatVersion: int32(f.Version),
closed: make(chan struct{}),
checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "",
}
m.startPackIndexLocked()
@@ -1098,3 +1082,12 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting
return m, nil
}
func createFormatter(f FormattingOptions) (Formatter, error) {
sf := FormatterFactories[f.BlockFormat]
if sf == nil {
return nil, fmt.Errorf("unsupported block format: %v", f.BlockFormat)
}
return sf(f)
}

View File

@@ -588,7 +588,7 @@ func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time, t
bm, err := newManagerWithOptions(context.Background(), st, FormattingOptions{
BlockFormat: "TESTONLY_MD5",
MaxPackSize: maxPackSize,
}, CachingOptions{}, timeFunc, 0)
}, CachingOptions{}, timeFunc)
if err != nil {
panic("can't create block manager: " + err.Error())
}

View File

@@ -1,6 +1,8 @@
package block
import (
"path/filepath"
"github.com/kopia/kopia/internal/packindex"
)
@@ -14,9 +16,13 @@ type committedBlockIndex interface {
use(indexBlockIDs []string) (bool, error)
}
func newCommittedBlockIndex() committedBlockIndex {
func newCommittedBlockIndex(caching CachingOptions) (committedBlockIndex, error) {
if caching.CacheDirectory != "" {
return newSimpleCommittedBlockIndex(filepath.Join(caching.CacheDirectory, "indexes"))
}
return &inMemoryCommittedBlockIndex{
cachedPhysicalBlocks: make(map[string]packindex.Index),
usedPhysicalBlocks: make(map[string]packindex.Index),
}
}, nil
}

View File

@@ -9,10 +9,6 @@ type apiError struct {
message string
}
func malformedRequestError() *apiError {
return &apiError{400, "malformed request"}
}
func internalServerError(err error) *apiError {
return &apiError{500, fmt.Sprintf("internal server error: %v", err)}
}

View File

@@ -226,6 +226,11 @@ func (m *Manager) Delete(id string) {
}
}
// Refresh updates the committed blocks from the underlying storage.
func (m *Manager) Refresh(ctx context.Context) error {
return m.loadCommittedBlocks(ctx)
}
func (m *Manager) loadCommittedBlocks(ctx context.Context) error {
log.Debug().Msg("listing manifest blocks")
blocks, err := m.b.ListBlocks(manifestBlockPrefix)

View File

@@ -2,6 +2,7 @@
import (
"context"
"fmt"
"time"
"github.com/kopia/kopia/auth"
@@ -54,6 +55,28 @@ func (r *Repository) Flush(ctx context.Context) error {
return r.Blocks.Flush(ctx)
}
// Refresh periodically makes external changes visible to repository.
func (r *Repository) Refresh(ctx context.Context) error {
updated, err := r.Blocks.Refresh(ctx)
if err != nil {
return fmt.Errorf("error refreshing block index: %v", err)
}
if !updated {
return nil
}
log.Printf("block index refreshed")
if err := r.Manifests.Refresh(ctx); err != nil {
return fmt.Errorf("error reloading manifests: %v", err)
}
log.Printf("manifests refreshed")
return nil
}
// RefreshPeriodically periodically refreshes the repository to reflect the changes made by other hosts.
func (r *Repository) RefreshPeriodically(ctx context.Context, interval time.Duration) {
for {
@@ -62,13 +85,8 @@ func (r *Repository) RefreshPeriodically(ctx context.Context, interval time.Dura
return
case <-time.After(interval):
if updated, err := r.Blocks.Refresh(ctx); err != nil {
log.Warn().Msgf("error reloading indexes: %v", err)
} else if updated {
log.Printf("reloaded indexes")
// if err := r.Manifests.Refresh(ctx); err != nil {
// log.Warn().Msgf("error reloading manifests: %v", err)
// }
if err := r.Refresh(ctx); err != nil {
log.Warn().Msgf("error refreshing repository: %v", err)
}
}
}