From 1ae6c6df035f551dff4a2905b5facdbfd7bc15fb Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sun, 15 May 2022 18:21:30 -0700 Subject: [PATCH] fix(repository): fixed slow goroutine leak from indexBlobCache, added tests (#1950) --- cli/app.go | 14 ++ cli/suite_test.go | 2 + internal/cache/persistent_lru_cache.go | 5 + internal/releasable/releaseable_tracker.go | 136 ++++++++++++++++++ .../releasable/releaseable_tracker_test.go | 45 ++++++ internal/repotesting/repotesting.go | 5 +- internal/testutil/testutil.go | 23 +++ repo/content/committed_read_manager.go | 3 + repo/content/content_manager_test.go | 4 +- 9 files changed, 235 insertions(+), 2 deletions(-) create mode 100644 internal/releasable/releaseable_tracker.go create mode 100644 internal/releasable/releaseable_tracker_test.go diff --git a/cli/app.go b/cli/app.go index adda35679..253743667 100644 --- a/cli/app.go +++ b/cli/app.go @@ -20,6 +20,7 @@ "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/memtrack" "github.com/kopia/kopia/internal/passwordpersist" + "github.com/kopia/kopia/internal/releasable" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/logging" @@ -122,6 +123,7 @@ type App struct { disableInternalLog bool AdvancedCommands string cliStorageProviders []StorageProvider + trackReleasable []string currentAction string onExitCallbacks []func() @@ -231,6 +233,7 @@ func (c *App) setup(app *kingpin.Application) { app.Flag("persist-credentials", "Persist credentials").Default("true").Envar("KOPIA_PERSIST_CREDENTIALS_ON_CONNECT").BoolVar(&c.persistCredentials) app.Flag("disable-internal-log", "Disable internal log").Hidden().Envar("KOPIA_DISABLE_INTERNAL_LOG").BoolVar(&c.disableInternalLog) app.Flag("advanced-commands", "Enable advanced (and potentially dangerous) commands.").Hidden().Envar("KOPIA_ADVANCED_COMMANDS").StringVar(&c.AdvancedCommands) + app.Flag("track-releasable", "Enable tracking of releasable resources.").Hidden().Envar("KOPIA_TRACK_RELEASABLE").StringsVar(&c.trackReleasable) c.setupOSSpecificKeychainFlags(app) @@ -421,6 +424,10 @@ func (c *App) rootContext() context.Context { ctx = logging.WithLogger(ctx, c.loggerFactory) } + for _, r := range c.trackReleasable { + releasable.EnableTracking(releasable.ItemKind(r)) + } + return ctx } @@ -466,6 +473,13 @@ func (c *App) baseActionWithContext(act func(ctx context.Context) error) func(ct c.osExit(1) } + if len(c.trackReleasable) > 0 { + if err := releasable.Verify(); err != nil { + log(ctx0).Warnf("%v", err.Error()) + c.osExit(1) + } + } + return nil } } diff --git a/cli/suite_test.go b/cli/suite_test.go index 592e68118..41fac4cba 100644 --- a/cli/suite_test.go +++ b/cli/suite_test.go @@ -7,6 +7,8 @@ "github.com/kopia/kopia/repo/content" ) +func TestMain(m *testing.M) { testutil.MyTestMain(m) } + type formatSpecificTestSuite struct { formatFlags []string formatVersion content.FormatVersion diff --git a/internal/cache/persistent_lru_cache.go b/internal/cache/persistent_lru_cache.go index 01ef5b666..281b77361 100644 --- a/internal/cache/persistent_lru_cache.go +++ b/internal/cache/persistent_lru_cache.go @@ -14,6 +14,7 @@ "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/ctxutil" "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/releasable" "github.com/kopia/kopia/internal/timetrack" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/logging" @@ -196,6 +197,8 @@ func (c *PersistentCache) Close(ctx context.Context) { log(ctx).Errorf("error during final sweep of the %v: %v", c.description, err) } } + + releasable.Released("persistent-cache", c) } func (c *PersistentCache) sweepDirectoryPeriodically(ctx context.Context) { @@ -349,6 +352,8 @@ func NewPersistentCache(ctx context.Context, description string, cacheStorage St return nil, errors.Wrapf(err, "unable to open %v", c.description) } + releasable.Created("persistent-cache", c) + c.periodicSweepRunning.Add(1) go c.sweepDirectoryPeriodically(ctxutil.Detach(ctx)) diff --git a/internal/releasable/releaseable_tracker.go b/internal/releasable/releaseable_tracker.go new file mode 100644 index 000000000..639d4d13e --- /dev/null +++ b/internal/releasable/releaseable_tracker.go @@ -0,0 +1,136 @@ +// Package releasable allows process-wide tracking of objects that need to be released. +package releasable + +import ( + "bytes" + "fmt" + "runtime/debug" + "sync" + + "github.com/pkg/errors" +) + +// ItemKind identifies the kind of a releasable item, e.g. "connection", "cache", etc. +type ItemKind string + +// Created should be called whenever an item is created. If tracking is enabled, it captures the stack trace of +// the current goroutine and stores it in a map. +func Created(kind ItemKind, itemID interface{}) { + getPerKind(kind).created(itemID) +} + +// Released should be called whenever an item is released. +func Released(kind ItemKind, itemID interface{}) { + getPerKind(kind).released(itemID) +} + +// Active returns the map of all active items. +func Active() map[ItemKind]map[interface{}]string { + perKindMutex.Lock() + defer perKindMutex.Unlock() + + res := map[ItemKind]map[interface{}]string{} + for k, v := range perKindTrackers { + res[k] = v.active() + } + + return res +} + +// Verify returns error if not all releasable resources have been released. +func Verify() error { + var buf bytes.Buffer + + for itemKind, active := range Active() { + if len(active) > 0 { + fmt.Fprintf(&buf, "found %v %q resources that have not been released:\n", len(active), itemKind) + + for _, stack := range active { + fmt.Fprintf(&buf, " - %v\n", stack) + } + } + } + + if buf.Len() == 0 { + return nil + } + + return errors.New(buf.String()) +} + +type perKindTracker struct { + mu sync.Mutex + + // +checklocks:mu + items map[interface{}]string +} + +func (s *perKindTracker) created(itemID interface{}) { + if s == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.items[itemID] = string(debug.Stack()) +} + +func (s *perKindTracker) released(itemID interface{}) { + if s == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.items, itemID) +} + +func (s *perKindTracker) active() map[interface{}]string { + s.mu.Lock() + defer s.mu.Unlock() + + res := map[interface{}]string{} + for k, v := range s.items { + res[k] = v + } + + return res +} + +var ( + perKindMutex sync.Mutex // nolint:gochecknoglobals + + // +checklocks:perKindMutex + perKindTrackers = map[ItemKind]*perKindTracker{} // nolint:gochecknoglobals +) + +// EnableTracking enables tracking of the given item type. +func EnableTracking(kind ItemKind) { + perKindMutex.Lock() + defer perKindMutex.Unlock() + + if perKindTrackers[kind] != nil { + return + } + + perKindTrackers[kind] = &perKindTracker{ + items: map[interface{}]string{}, + } +} + +// DisableTracking disables tracking of the given item type. +func DisableTracking(kind ItemKind) { + perKindMutex.Lock() + defer perKindMutex.Unlock() + + delete(perKindTrackers, kind) +} + +func getPerKind(kind ItemKind) *perKindTracker { + perKindMutex.Lock() + defer perKindMutex.Unlock() + + return perKindTrackers[kind] +} diff --git a/internal/releasable/releaseable_tracker_test.go b/internal/releasable/releaseable_tracker_test.go new file mode 100644 index 000000000..7e04338f6 --- /dev/null +++ b/internal/releasable/releaseable_tracker_test.go @@ -0,0 +1,45 @@ +package releasable_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/releasable" +) + +func TestReleaseable(t *testing.T) { + releasable.EnableTracking("some-kind") + require.Contains(t, releasable.Active(), releasable.ItemKind("some-kind")) + + releasable.Created("some-kind", 1) + assert.Len(t, releasable.Active()["some-kind"], 1) + releasable.Created("some-kind", 2) + assert.Len(t, releasable.Active()["some-kind"], 2) + releasable.Released("some-kind", 1) + assert.Len(t, releasable.Active()["some-kind"], 1) + + require.ErrorContains(t, releasable.Verify(), "found 1 \"some-kind\" resources that have not been released") + + releasable.Released("some-kind", 2) + assert.Len(t, releasable.Active()["some-kind"], 0) + releasable.Released("some-kind", 2) + assert.Len(t, releasable.Active()["some-kind"], 0) + + releasable.DisableTracking("some-kind") + require.NotContains(t, releasable.Active(), releasable.ItemKind("some-kind")) + + require.NoError(t, releasable.Verify()) + + // no-ops + releasable.Created("some-kind", 1) + releasable.Released("some-kind", 2) + + releasable.EnableTracking("some-kind") + releasable.Created("some-kind", 1) + releasable.EnableTracking("some-kind") + releasable.Created("some-kind", 2) + require.ErrorContains(t, releasable.Verify(), "found 2 \"some-kind\" resources that have not been released") + releasable.DisableTracking("some-kind") +} diff --git a/internal/repotesting/repotesting.go b/internal/repotesting/repotesting.go index 66d5f67ca..6f15b1246 100644 --- a/internal/repotesting/repotesting.go +++ b/internal/repotesting/repotesting.go @@ -119,7 +119,10 @@ func (e *Environment) setup(tb testing.TB, version content.FormatVersion, opts . tb.Fatal(err) } - tb.Cleanup(func() { rep.Close(ctx) }) + tb.Cleanup(func() { + e.RepositoryWriter.Close(ctx) + rep.Close(ctx) + }) return e } diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 0b90647c4..6f2fcf26f 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" + "log" "os" "path/filepath" "reflect" @@ -12,6 +13,8 @@ "testing" "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/releasable" ) // ProviderTest marks the test method so that it only runs in provider-tests suite. @@ -101,8 +104,28 @@ func ShouldSkipLongFilenames() bool { // MyTestMain runs tests and verifies some post-run invariants. func MyTestMain(m *testing.M) { + releasable.EnableTracking("persistent-cache") + v := m.Run() + totalLeaked := 0 + + for itemKind, active := range releasable.Active() { + if len(active) > 0 { + log.Printf("found %v leaked %v:", len(active), itemKind) + + for _, stack := range active { + log.Println(" - " + stack) + } + + totalLeaked++ + } + + if totalLeaked > 0 { + os.Exit(1) + } + } + os.Exit(v) } diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 453267c6b..e88a9203a 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -86,6 +86,7 @@ type SharedManager struct { contentCache cache.ContentCache metadataCache cache.ContentCache + indexBlobCache *cache.PersistentCache committedContents *committedContentIndex crypter *Crypter enc *encryptedBlobMgr @@ -468,6 +469,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca // once everything is ready, set it up sm.contentCache = dataCache sm.metadataCache = metadataCache + sm.indexBlobCache = indexBlobCache sm.committedContents = newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.enc.getEncryptedBlob, sm.namedLogger("committed-content-index"), caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge)) return nil @@ -516,6 +518,7 @@ func (sm *SharedManager) release(ctx context.Context) error { sm.contentCache.Close(ctx) sm.metadataCache.Close(ctx) + sm.indexBlobCache.Close(ctx) if sm.internalLogger != nil { sm.internalLogger.Sync() // nolint:errcheck diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 9e00f4813..f05b4b69e 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -2371,7 +2371,9 @@ func (s *contentManagerSuite) newTestContentManagerWithTweaks(t *testing.T, st b panic("can't create content manager: " + err.Error()) } - t.Cleanup(func() { bm.Close(ctx) }) + t.Cleanup(func() { + bm.Close(ctx) + }) bm.checkInvariantsOnUnlock = true