From f66fe5789ec3edb174c108bb074f4e93400bc757 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Fri, 2 Oct 2020 19:48:21 -0700 Subject: [PATCH] Eliminated busy loop after snapshot failure (#658) * server: if a snapshot fails, don't start the next one for 5 minutes or until the next successful refresh. * Makefile: don't print skipped tests --- Makefile | 2 +- internal/server/source_manager.go | 57 ++++++++++++++++++++----------- 2 files changed, 39 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index cf95a8fd3..cc29934a6 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ endif include tools/tools.mk -GO_TEST=$(gotestsum) --format=pkgname-and-test-fails -- +GO_TEST=$(gotestsum) --format=pkgname-and-test-fails --no-summary=skipped -- LINTER_DEADLINE=300s UNIT_TESTS_TIMEOUT=300s diff --git a/internal/server/source_manager.go b/internal/server/source_manager.go index 1a3294975..15f870cd6 100644 --- a/internal/server/source_manager.go +++ b/internal/server/source_manager.go @@ -5,6 +5,8 @@ "sync" "time" + "github.com/pkg/errors" + "github.com/kopia/kopia/fs/localfs" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/ctxutil" @@ -15,8 +17,10 @@ ) const ( - statusRefreshInterval = 15 * time.Second // how frequently to refresh source status - oneDay = 24 * time.Hour + statusRefreshInterval = 15 * time.Second // how frequently to refresh source status + failedSnapshotRetryInterval = 5 * time.Minute + refreshTimeout = 30 * time.Second // max amount of time to refresh a single source + oneDay = 24 * time.Hour ) // sourceManager manages the state machine of each source @@ -138,12 +142,27 @@ func (s *sourceManager) runLocal(ctx context.Context) { case <-time.After(waitTime): log(ctx).Debugf("snapshotting %v", s.src) - s.snapshot(ctx) - s.refreshStatus(ctx) + + if err := s.snapshot(ctx); err != nil { + log(ctx).Errorf("snapshot error: %v", err) + + s.backoffBeforeNextSnapshot() + } else { + s.refreshStatus(ctx) + } } } } +func (s *sourceManager) backoffBeforeNextSnapshot() { + if s.nextSnapshotTime == nil { + return + } + + t := clock.Now().Add(failedSnapshotRetryInterval) + s.nextSnapshotTime = &t +} + func (s *sourceManager) runRemote(ctx context.Context) { s.refreshStatus(ctx) s.setStatus("REMOTE") @@ -199,7 +218,7 @@ func (s *sourceManager) waitUntilStopped(ctx context.Context) { log(ctx).Debugf("source manager for %v has stopped", s.src) } -func (s *sourceManager) snapshot(ctx context.Context) { +func (s *sourceManager) snapshot(ctx context.Context) error { s.setStatus("PENDING") s.server.beginUpload(ctx, s.src) @@ -211,53 +230,50 @@ func (s *sourceManager) snapshot(ctx context.Context) { select { case <-s.closed: log(ctx).Infof("not snapshotting %v because source manager is shutting down", s.src) - return + return nil default: } localEntry, err := localfs.NewEntry(s.src.Path) if err != nil { - log(ctx).Errorf("unable to create local filesystem: %v", err) - return + return errors.Wrap(err, "unable to create local filesystem") } u := snapshotfs.NewUploader(s.server.rep) policyTree, err := policy.TreeForSource(ctx, s.server.rep, s.src) if err != nil { - log(ctx).Errorf("unable to create policy getter: %v", err) + return errors.Wrap(err, "unable to create policy getter") } u.Progress = s.progress - log(ctx).Infof("starting upload of %v", s.src) + log(ctx).Debugf("starting upload of %v", s.src) s.setUploader(u) manifest, err := u.Upload(ctx, localEntry, policyTree, s.src, s.manifestsSinceLastCompleteSnapshot...) s.setUploader(nil) if err != nil { - log(ctx).Errorf("upload error: %v", err) - return + return errors.Wrap(err, "upload error") } snapshotID, err := snapshot.SaveSnapshot(ctx, s.server.rep, manifest) if err != nil { - log(ctx).Errorf("unable to save snapshot: %v", err) - return + return errors.Wrap(err, "unable to save snapshot") } if _, err := policy.ApplyRetentionPolicy(ctx, s.server.rep, s.src, true); err != nil { - log(ctx).Errorf("unable to apply retention policy: %v", err) - return + return errors.Wrap(err, "unable to apply retention policy") } - log(ctx).Infof("created snapshot %v", snapshotID) + log(ctx).Debugf("created snapshot %v", snapshotID) if err := s.server.rep.Flush(ctx); err != nil { - log(ctx).Errorf("unable to flush: %v", err) - return + return errors.Wrap(err, "unable to flush") } + + return nil } func (s *sourceManager) findClosestNextSnapshotTime() *time.Time { @@ -287,6 +303,9 @@ func (s *sourceManager) findClosestNextSnapshotTime() *time.Time { } func (s *sourceManager) refreshStatus(ctx context.Context) { + ctx, cancel := context.WithTimeout(ctx, refreshTimeout) + defer cancel() + log(ctx).Debugf("refreshing state for %v", s.src) pol, _, err := policy.GetEffectivePolicy(ctx, s.server.rep, s.src)