mirror of
https://github.com/kopia/kopia.git
synced 2026-03-11 18:56:28 -04:00
Eliminated busy loop after snapshot failure (#658)
* server: if a snapshot fails, don't start the next one for 5 minutes or until the next successful refresh. * Makefile: don't print skipped tests
This commit is contained in:
2
Makefile
2
Makefile
@@ -15,7 +15,7 @@ endif
|
||||
|
||||
include tools/tools.mk
|
||||
|
||||
GO_TEST=$(gotestsum) --format=pkgname-and-test-fails --
|
||||
GO_TEST=$(gotestsum) --format=pkgname-and-test-fails --no-summary=skipped --
|
||||
|
||||
LINTER_DEADLINE=300s
|
||||
UNIT_TESTS_TIMEOUT=300s
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/fs/localfs"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/ctxutil"
|
||||
@@ -15,8 +17,10 @@
|
||||
)
|
||||
|
||||
const (
|
||||
statusRefreshInterval = 15 * time.Second // how frequently to refresh source status
|
||||
oneDay = 24 * time.Hour
|
||||
statusRefreshInterval = 15 * time.Second // how frequently to refresh source status
|
||||
failedSnapshotRetryInterval = 5 * time.Minute
|
||||
refreshTimeout = 30 * time.Second // max amount of time to refresh a single source
|
||||
oneDay = 24 * time.Hour
|
||||
)
|
||||
|
||||
// sourceManager manages the state machine of each source
|
||||
@@ -138,12 +142,27 @@ func (s *sourceManager) runLocal(ctx context.Context) {
|
||||
|
||||
case <-time.After(waitTime):
|
||||
log(ctx).Debugf("snapshotting %v", s.src)
|
||||
s.snapshot(ctx)
|
||||
s.refreshStatus(ctx)
|
||||
|
||||
if err := s.snapshot(ctx); err != nil {
|
||||
log(ctx).Errorf("snapshot error: %v", err)
|
||||
|
||||
s.backoffBeforeNextSnapshot()
|
||||
} else {
|
||||
s.refreshStatus(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sourceManager) backoffBeforeNextSnapshot() {
|
||||
if s.nextSnapshotTime == nil {
|
||||
return
|
||||
}
|
||||
|
||||
t := clock.Now().Add(failedSnapshotRetryInterval)
|
||||
s.nextSnapshotTime = &t
|
||||
}
|
||||
|
||||
func (s *sourceManager) runRemote(ctx context.Context) {
|
||||
s.refreshStatus(ctx)
|
||||
s.setStatus("REMOTE")
|
||||
@@ -199,7 +218,7 @@ func (s *sourceManager) waitUntilStopped(ctx context.Context) {
|
||||
log(ctx).Debugf("source manager for %v has stopped", s.src)
|
||||
}
|
||||
|
||||
func (s *sourceManager) snapshot(ctx context.Context) {
|
||||
func (s *sourceManager) snapshot(ctx context.Context) error {
|
||||
s.setStatus("PENDING")
|
||||
|
||||
s.server.beginUpload(ctx, s.src)
|
||||
@@ -211,53 +230,50 @@ func (s *sourceManager) snapshot(ctx context.Context) {
|
||||
select {
|
||||
case <-s.closed:
|
||||
log(ctx).Infof("not snapshotting %v because source manager is shutting down", s.src)
|
||||
return
|
||||
return nil
|
||||
|
||||
default:
|
||||
}
|
||||
|
||||
localEntry, err := localfs.NewEntry(s.src.Path)
|
||||
if err != nil {
|
||||
log(ctx).Errorf("unable to create local filesystem: %v", err)
|
||||
return
|
||||
return errors.Wrap(err, "unable to create local filesystem")
|
||||
}
|
||||
|
||||
u := snapshotfs.NewUploader(s.server.rep)
|
||||
|
||||
policyTree, err := policy.TreeForSource(ctx, s.server.rep, s.src)
|
||||
if err != nil {
|
||||
log(ctx).Errorf("unable to create policy getter: %v", err)
|
||||
return errors.Wrap(err, "unable to create policy getter")
|
||||
}
|
||||
|
||||
u.Progress = s.progress
|
||||
|
||||
log(ctx).Infof("starting upload of %v", s.src)
|
||||
log(ctx).Debugf("starting upload of %v", s.src)
|
||||
s.setUploader(u)
|
||||
manifest, err := u.Upload(ctx, localEntry, policyTree, s.src, s.manifestsSinceLastCompleteSnapshot...)
|
||||
s.setUploader(nil)
|
||||
|
||||
if err != nil {
|
||||
log(ctx).Errorf("upload error: %v", err)
|
||||
return
|
||||
return errors.Wrap(err, "upload error")
|
||||
}
|
||||
|
||||
snapshotID, err := snapshot.SaveSnapshot(ctx, s.server.rep, manifest)
|
||||
if err != nil {
|
||||
log(ctx).Errorf("unable to save snapshot: %v", err)
|
||||
return
|
||||
return errors.Wrap(err, "unable to save snapshot")
|
||||
}
|
||||
|
||||
if _, err := policy.ApplyRetentionPolicy(ctx, s.server.rep, s.src, true); err != nil {
|
||||
log(ctx).Errorf("unable to apply retention policy: %v", err)
|
||||
return
|
||||
return errors.Wrap(err, "unable to apply retention policy")
|
||||
}
|
||||
|
||||
log(ctx).Infof("created snapshot %v", snapshotID)
|
||||
log(ctx).Debugf("created snapshot %v", snapshotID)
|
||||
|
||||
if err := s.server.rep.Flush(ctx); err != nil {
|
||||
log(ctx).Errorf("unable to flush: %v", err)
|
||||
return
|
||||
return errors.Wrap(err, "unable to flush")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sourceManager) findClosestNextSnapshotTime() *time.Time {
|
||||
@@ -287,6 +303,9 @@ func (s *sourceManager) findClosestNextSnapshotTime() *time.Time {
|
||||
}
|
||||
|
||||
func (s *sourceManager) refreshStatus(ctx context.Context) {
|
||||
ctx, cancel := context.WithTimeout(ctx, refreshTimeout)
|
||||
defer cancel()
|
||||
|
||||
log(ctx).Debugf("refreshing state for %v", s.src)
|
||||
|
||||
pol, _, err := policy.GetEffectivePolicy(ctx, s.server.rep, s.src)
|
||||
|
||||
Reference in New Issue
Block a user