mirror of
https://github.com/kopia/kopia.git
synced 2026-04-23 15:38:23 -04:00
fix(repository): fixed slow goroutine leak from indexBlobCache, added tests (#1950)
This commit is contained in:
14
cli/app.go
14
cli/app.go
@@ -20,6 +20,7 @@
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/memtrack"
|
||||
"github.com/kopia/kopia/internal/passwordpersist"
|
||||
"github.com/kopia/kopia/internal/releasable"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
@@ -122,6 +123,7 @@ type App struct {
|
||||
disableInternalLog bool
|
||||
AdvancedCommands string
|
||||
cliStorageProviders []StorageProvider
|
||||
trackReleasable []string
|
||||
|
||||
currentAction string
|
||||
onExitCallbacks []func()
|
||||
@@ -231,6 +233,7 @@ func (c *App) setup(app *kingpin.Application) {
|
||||
app.Flag("persist-credentials", "Persist credentials").Default("true").Envar("KOPIA_PERSIST_CREDENTIALS_ON_CONNECT").BoolVar(&c.persistCredentials)
|
||||
app.Flag("disable-internal-log", "Disable internal log").Hidden().Envar("KOPIA_DISABLE_INTERNAL_LOG").BoolVar(&c.disableInternalLog)
|
||||
app.Flag("advanced-commands", "Enable advanced (and potentially dangerous) commands.").Hidden().Envar("KOPIA_ADVANCED_COMMANDS").StringVar(&c.AdvancedCommands)
|
||||
app.Flag("track-releasable", "Enable tracking of releasable resources.").Hidden().Envar("KOPIA_TRACK_RELEASABLE").StringsVar(&c.trackReleasable)
|
||||
|
||||
c.setupOSSpecificKeychainFlags(app)
|
||||
|
||||
@@ -421,6 +424,10 @@ func (c *App) rootContext() context.Context {
|
||||
ctx = logging.WithLogger(ctx, c.loggerFactory)
|
||||
}
|
||||
|
||||
for _, r := range c.trackReleasable {
|
||||
releasable.EnableTracking(releasable.ItemKind(r))
|
||||
}
|
||||
|
||||
return ctx
|
||||
}
|
||||
|
||||
@@ -466,6 +473,13 @@ func (c *App) baseActionWithContext(act func(ctx context.Context) error) func(ct
|
||||
c.osExit(1)
|
||||
}
|
||||
|
||||
if len(c.trackReleasable) > 0 {
|
||||
if err := releasable.Verify(); err != nil {
|
||||
log(ctx0).Warnf("%v", err.Error())
|
||||
c.osExit(1)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) { testutil.MyTestMain(m) }
|
||||
|
||||
type formatSpecificTestSuite struct {
|
||||
formatFlags []string
|
||||
formatVersion content.FormatVersion
|
||||
|
||||
5
internal/cache/persistent_lru_cache.go
vendored
5
internal/cache/persistent_lru_cache.go
vendored
@@ -14,6 +14,7 @@
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/ctxutil"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/releasable"
|
||||
"github.com/kopia/kopia/internal/timetrack"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
@@ -196,6 +197,8 @@ func (c *PersistentCache) Close(ctx context.Context) {
|
||||
log(ctx).Errorf("error during final sweep of the %v: %v", c.description, err)
|
||||
}
|
||||
}
|
||||
|
||||
releasable.Released("persistent-cache", c)
|
||||
}
|
||||
|
||||
func (c *PersistentCache) sweepDirectoryPeriodically(ctx context.Context) {
|
||||
@@ -349,6 +352,8 @@ func NewPersistentCache(ctx context.Context, description string, cacheStorage St
|
||||
return nil, errors.Wrapf(err, "unable to open %v", c.description)
|
||||
}
|
||||
|
||||
releasable.Created("persistent-cache", c)
|
||||
|
||||
c.periodicSweepRunning.Add(1)
|
||||
|
||||
go c.sweepDirectoryPeriodically(ctxutil.Detach(ctx))
|
||||
|
||||
136
internal/releasable/releaseable_tracker.go
Normal file
136
internal/releasable/releaseable_tracker.go
Normal file
@@ -0,0 +1,136 @@
|
||||
// Package releasable allows process-wide tracking of objects that need to be released.
|
||||
package releasable
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ItemKind identifies the kind of a releasable item, e.g. "connection", "cache", etc.
|
||||
type ItemKind string
|
||||
|
||||
// Created should be called whenever an item is created. If tracking is enabled, it captures the stack trace of
|
||||
// the current goroutine and stores it in a map.
|
||||
func Created(kind ItemKind, itemID interface{}) {
|
||||
getPerKind(kind).created(itemID)
|
||||
}
|
||||
|
||||
// Released should be called whenever an item is released.
|
||||
func Released(kind ItemKind, itemID interface{}) {
|
||||
getPerKind(kind).released(itemID)
|
||||
}
|
||||
|
||||
// Active returns the map of all active items.
|
||||
func Active() map[ItemKind]map[interface{}]string {
|
||||
perKindMutex.Lock()
|
||||
defer perKindMutex.Unlock()
|
||||
|
||||
res := map[ItemKind]map[interface{}]string{}
|
||||
for k, v := range perKindTrackers {
|
||||
res[k] = v.active()
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
// Verify returns error if not all releasable resources have been released.
|
||||
func Verify() error {
|
||||
var buf bytes.Buffer
|
||||
|
||||
for itemKind, active := range Active() {
|
||||
if len(active) > 0 {
|
||||
fmt.Fprintf(&buf, "found %v %q resources that have not been released:\n", len(active), itemKind)
|
||||
|
||||
for _, stack := range active {
|
||||
fmt.Fprintf(&buf, " - %v\n", stack)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if buf.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New(buf.String())
|
||||
}
|
||||
|
||||
type perKindTracker struct {
|
||||
mu sync.Mutex
|
||||
|
||||
// +checklocks:mu
|
||||
items map[interface{}]string
|
||||
}
|
||||
|
||||
func (s *perKindTracker) created(itemID interface{}) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.items[itemID] = string(debug.Stack())
|
||||
}
|
||||
|
||||
func (s *perKindTracker) released(itemID interface{}) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
delete(s.items, itemID)
|
||||
}
|
||||
|
||||
func (s *perKindTracker) active() map[interface{}]string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
res := map[interface{}]string{}
|
||||
for k, v := range s.items {
|
||||
res[k] = v
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
var (
|
||||
perKindMutex sync.Mutex // nolint:gochecknoglobals
|
||||
|
||||
// +checklocks:perKindMutex
|
||||
perKindTrackers = map[ItemKind]*perKindTracker{} // nolint:gochecknoglobals
|
||||
)
|
||||
|
||||
// EnableTracking enables tracking of the given item type.
|
||||
func EnableTracking(kind ItemKind) {
|
||||
perKindMutex.Lock()
|
||||
defer perKindMutex.Unlock()
|
||||
|
||||
if perKindTrackers[kind] != nil {
|
||||
return
|
||||
}
|
||||
|
||||
perKindTrackers[kind] = &perKindTracker{
|
||||
items: map[interface{}]string{},
|
||||
}
|
||||
}
|
||||
|
||||
// DisableTracking disables tracking of the given item type.
|
||||
func DisableTracking(kind ItemKind) {
|
||||
perKindMutex.Lock()
|
||||
defer perKindMutex.Unlock()
|
||||
|
||||
delete(perKindTrackers, kind)
|
||||
}
|
||||
|
||||
func getPerKind(kind ItemKind) *perKindTracker {
|
||||
perKindMutex.Lock()
|
||||
defer perKindMutex.Unlock()
|
||||
|
||||
return perKindTrackers[kind]
|
||||
}
|
||||
45
internal/releasable/releaseable_tracker_test.go
Normal file
45
internal/releasable/releaseable_tracker_test.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package releasable_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/internal/releasable"
|
||||
)
|
||||
|
||||
func TestReleaseable(t *testing.T) {
|
||||
releasable.EnableTracking("some-kind")
|
||||
require.Contains(t, releasable.Active(), releasable.ItemKind("some-kind"))
|
||||
|
||||
releasable.Created("some-kind", 1)
|
||||
assert.Len(t, releasable.Active()["some-kind"], 1)
|
||||
releasable.Created("some-kind", 2)
|
||||
assert.Len(t, releasable.Active()["some-kind"], 2)
|
||||
releasable.Released("some-kind", 1)
|
||||
assert.Len(t, releasable.Active()["some-kind"], 1)
|
||||
|
||||
require.ErrorContains(t, releasable.Verify(), "found 1 \"some-kind\" resources that have not been released")
|
||||
|
||||
releasable.Released("some-kind", 2)
|
||||
assert.Len(t, releasable.Active()["some-kind"], 0)
|
||||
releasable.Released("some-kind", 2)
|
||||
assert.Len(t, releasable.Active()["some-kind"], 0)
|
||||
|
||||
releasable.DisableTracking("some-kind")
|
||||
require.NotContains(t, releasable.Active(), releasable.ItemKind("some-kind"))
|
||||
|
||||
require.NoError(t, releasable.Verify())
|
||||
|
||||
// no-ops
|
||||
releasable.Created("some-kind", 1)
|
||||
releasable.Released("some-kind", 2)
|
||||
|
||||
releasable.EnableTracking("some-kind")
|
||||
releasable.Created("some-kind", 1)
|
||||
releasable.EnableTracking("some-kind")
|
||||
releasable.Created("some-kind", 2)
|
||||
require.ErrorContains(t, releasable.Verify(), "found 2 \"some-kind\" resources that have not been released")
|
||||
releasable.DisableTracking("some-kind")
|
||||
}
|
||||
@@ -119,7 +119,10 @@ func (e *Environment) setup(tb testing.TB, version content.FormatVersion, opts .
|
||||
tb.Fatal(err)
|
||||
}
|
||||
|
||||
tb.Cleanup(func() { rep.Close(ctx) })
|
||||
tb.Cleanup(func() {
|
||||
e.RepositoryWriter.Close(ctx)
|
||||
rep.Close(ctx)
|
||||
})
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
@@ -12,6 +13,8 @@
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/internal/releasable"
|
||||
)
|
||||
|
||||
// ProviderTest marks the test method so that it only runs in provider-tests suite.
|
||||
@@ -101,8 +104,28 @@ func ShouldSkipLongFilenames() bool {
|
||||
|
||||
// MyTestMain runs tests and verifies some post-run invariants.
|
||||
func MyTestMain(m *testing.M) {
|
||||
releasable.EnableTracking("persistent-cache")
|
||||
|
||||
v := m.Run()
|
||||
|
||||
totalLeaked := 0
|
||||
|
||||
for itemKind, active := range releasable.Active() {
|
||||
if len(active) > 0 {
|
||||
log.Printf("found %v leaked %v:", len(active), itemKind)
|
||||
|
||||
for _, stack := range active {
|
||||
log.Println(" - " + stack)
|
||||
}
|
||||
|
||||
totalLeaked++
|
||||
}
|
||||
|
||||
if totalLeaked > 0 {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
os.Exit(v)
|
||||
}
|
||||
|
||||
|
||||
@@ -86,6 +86,7 @@ type SharedManager struct {
|
||||
|
||||
contentCache cache.ContentCache
|
||||
metadataCache cache.ContentCache
|
||||
indexBlobCache *cache.PersistentCache
|
||||
committedContents *committedContentIndex
|
||||
crypter *Crypter
|
||||
enc *encryptedBlobMgr
|
||||
@@ -468,6 +469,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
|
||||
// once everything is ready, set it up
|
||||
sm.contentCache = dataCache
|
||||
sm.metadataCache = metadataCache
|
||||
sm.indexBlobCache = indexBlobCache
|
||||
sm.committedContents = newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.enc.getEncryptedBlob, sm.namedLogger("committed-content-index"), caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge))
|
||||
|
||||
return nil
|
||||
@@ -516,6 +518,7 @@ func (sm *SharedManager) release(ctx context.Context) error {
|
||||
|
||||
sm.contentCache.Close(ctx)
|
||||
sm.metadataCache.Close(ctx)
|
||||
sm.indexBlobCache.Close(ctx)
|
||||
|
||||
if sm.internalLogger != nil {
|
||||
sm.internalLogger.Sync() // nolint:errcheck
|
||||
|
||||
@@ -2371,7 +2371,9 @@ func (s *contentManagerSuite) newTestContentManagerWithTweaks(t *testing.T, st b
|
||||
panic("can't create content manager: " + err.Error())
|
||||
}
|
||||
|
||||
t.Cleanup(func() { bm.Close(ctx) })
|
||||
t.Cleanup(func() {
|
||||
bm.Close(ctx)
|
||||
})
|
||||
|
||||
bm.checkInvariantsOnUnlock = true
|
||||
|
||||
|
||||
Reference in New Issue
Block a user