diff --git a/Makefile b/Makefile index 654d6058f..17b6ea5ad 100644 --- a/Makefile +++ b/Makefile @@ -227,6 +227,7 @@ $(TESTING_ACTION_EXE): tests/testingaction/main.go integration-tests: export KOPIA_EXE ?= $(KOPIA_INTEGRATION_EXE) integration-tests: export KOPIA_08_EXE=$(kopia08) +integration-tests: export KOPIA_TRACK_CHUNK_ALLOC=1 integration-tests: export TESTING_ACTION_EXE ?= $(TESTING_ACTION_EXE) integration-tests: GOTESTSUM_FLAGS=--format=testname --no-summary=skipped --jsonfile=.tmp.integration-tests.json integration-tests: build-integration-test-binary $(gotestsum) $(TESTING_ACTION_EXE) $(kopia08) @@ -242,6 +243,7 @@ compat-tests: $(kopia_ui_embedded_exe) $(kopia08) $(gotestsum) endurance-tests: export KOPIA_EXE ?= $(KOPIA_INTEGRATION_EXE) endurance-tests: export KOPIA_LOGS_DIR=$(CURDIR)/.logs +endurance-tests: export KOPIA_TRACK_CHUNK_ALLOC=1 endurance-tests: build-integration-test-binary $(gotestsum) go test $(TEST_FLAGS) -count=$(REPEAT_TEST) -parallel $(PARALLEL) -timeout 3600s github.com/kopia/kopia/tests/endurance_test diff --git a/internal/gather/gather_write_buffer_chunk.go b/internal/gather/gather_write_buffer_chunk.go index 276e1d123..9eaf4c834 100644 --- a/internal/gather/gather_write_buffer_chunk.go +++ b/internal/gather/gather_write_buffer_chunk.go @@ -2,12 +2,25 @@ import ( "context" + "fmt" + "os" + "reflect" + "runtime" + "strings" "sync" + "unsafe" "github.com/alecthomas/units" ) +const ( + mynamespace = "kopia/kopia/internal/gather" + maxCallersToTrackAllocations = 3 +) + var ( + trackChunkAllocations = os.Getenv("KOPIA_TRACK_CHUNK_ALLOC") != "" + defaultAllocator = &chunkAllocator{ name: "default", chunkSize: 1 << 16, // nolint:gomnd @@ -32,6 +45,37 @@ type chunkAllocator struct { allocHighWaterMark int allocated int freed int + activeChunks map[uintptr]string +} + +func (a *chunkAllocator) trackAlloc(v []byte) []byte { + if trackChunkAllocations { + var ( + pcbuf [8]uintptr + callerFrames []string + ) + + n := runtime.Callers(maxCallersToTrackAllocations, pcbuf[:]) + frames := runtime.CallersFrames(pcbuf[0:n]) + + for f, ok := frames.Next(); ok; f, ok = frames.Next() { + fn := fmt.Sprintf("%v %v:%v", f.Func.Name(), f.File, f.Line) + + if fn != "" && !strings.Contains(fn, mynamespace) { + callerFrames = append(callerFrames, fn) + } + } + + hdr := (*reflect.SliceHeader)(unsafe.Pointer(&v)) //nolint:gosec + + if a.activeChunks == nil { + a.activeChunks = map[uintptr]string{} + } + + a.activeChunks[hdr.Data] = strings.Join(callerFrames, "\n") + } + + return v } func (a *chunkAllocator) allocChunk() []byte { @@ -46,13 +90,13 @@ func (a *chunkAllocator) allocChunk() []byte { l := len(a.freeList) if l == 0 { - return make([]byte, 0, a.chunkSize) + return a.trackAlloc(make([]byte, 0, a.chunkSize)) } ch := a.freeList[l-1] a.freeList = a.freeList[0 : l-1] - return ch + return a.trackAlloc(ch) } func (a *chunkAllocator) releaseChunk(s []byte) { @@ -63,6 +107,11 @@ func (a *chunkAllocator) releaseChunk(s []byte) { a.mu.Lock() defer a.mu.Unlock() + if a.activeChunks != nil { + hdr := (*reflect.SliceHeader)(unsafe.Pointer(&s)) //nolint:gosec + delete(a.activeChunks, hdr.Data) + } + a.freed++ if len(a.freeList) < a.maxFreeListSize { @@ -78,10 +127,26 @@ func (a *chunkAllocator) dumpStats(ctx context.Context, prefix string) { a.mu.Lock() defer a.mu.Unlock() - log(ctx).Debugf("%v (%v) - allocated %v chunks freed %v alive %v max %v free list high water mark: %v", + method := log(ctx).Debugf + + alive := a.allocated - a.freed + if alive > 0 { + method = log(ctx).Errorf + } + + method("%v (%v) - allocated %v chunks freed %v alive %v max %v free list high water mark: %v", prefix, units.Base2Bytes(int64(a.chunkSize)), - a.allocated, a.freed, a.allocated-a.freed, a.allocHighWaterMark, a.freeListHighWaterMark) + a.allocated, a.freed, alive, a.allocHighWaterMark, a.freeListHighWaterMark) + + for _, v := range a.activeChunks { + method("leaked chunk from %v", v) + } + + if trackChunkAllocations && len(a.activeChunks) > 0 { + // nolint:gocritic + os.Exit(1) + } } // DumpStats logs the allocator statistics. diff --git a/repo/content/builder.go b/repo/content/builder.go index 77e018251..78ce0c732 100644 --- a/repo/content/builder.go +++ b/repo/content/builder.go @@ -200,6 +200,8 @@ func (b packIndexBuilder) buildShards(indexVersion int, stable bool, shardSize i for _, s := range shardedBuilders { buf := gather.NewWriteBuffer() + dataShardsBuf = append(dataShardsBuf, buf) + if err := s.BuildStable(buf, indexVersion); err != nil { closeShards() diff --git a/repo/content/internal_logger.go b/repo/content/internal_logger.go index 1c1b1d6e7..7d5f1f582 100644 --- a/repo/content/internal_logger.go +++ b/repo/content/internal_logger.go @@ -119,10 +119,12 @@ func (l *internalLogger) add(level, msg string, args []interface{}) { func (l *internalLogger) maybeEncryptAndWriteChunkUnlocked(data gather.Bytes, closeFunc func()) { if data.Length() == 0 { + closeFunc() return } if atomic.LoadInt32(&l.enabled) == 0 { + closeFunc() return }