From daa62de3e48c3314f827e9100d760c94ca7e4221 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 19 Mar 2022 22:42:59 -0700 Subject: [PATCH] chore(ci): added checklocks static analyzer (#1838) From https://github.com/google/gvisor/tree/master/tools/checklocks This will perform static verification that we're using `sync.Mutex`, `sync.RWMutex` and `atomic` correctly to guard access to certain fields. This was mostly just a matter of adding annotations to indicate which fields are guarded by which mutex. In a handful of places the code had to be refactored to allow static analyzer to do its job better or to not be confused by some constructs. In one place this actually uncovered a bug where a function was not releasing a lock properly in an error case. The check is part of `make lint` but can also be invoked by `make check-locks`. --- Makefile | 11 +- cli/cli_progress.go | 45 ++++--- cli/command_content_stats.go | 4 +- cli/command_snapshot_list.go | 11 +- cli/command_snapshot_verify.go | 10 +- fs/cachefs/cache_test.go | 5 +- internal/auth/authn_repo.go | 11 +- internal/auth/authz_acl.go | 11 +- internal/blobtesting/eventually_consistent.go | 1 + internal/blobtesting/map.go | 5 +- internal/blobtesting/object_locking_map.go | 5 +- .../cache/content_cache_concurrency_test.go | 6 +- internal/cache/persistent_lru_cache.go | 1 + internal/connection/reconnector.go | 3 +- internal/connection/reconnector_test.go | 4 +- internal/faketime/faketime.go | 4 +- internal/fusemount/fusefs.go | 11 +- internal/gather/gather_write_buffer.go | 2 +- internal/gather/gather_write_buffer_chunk.go | 27 +++-- internal/iocopy/copy.go | 6 +- internal/logfile/logfile.go | 22 +++- internal/memtrack/memtrack.go | 25 +++- internal/ownwrites/ownwrites.go | 4 +- .../providervalidation/providervalidation.go | 9 +- internal/stats/countsum.go | 4 +- internal/stats/countsum_mutex.go | 6 +- internal/uitask/uitask.go | 8 +- internal/uitask/uitask_manager.go | 13 +- internal/webdavmount/webdavmount.go | 7 +- internal/workshare/workshare_pool.go | 1 + repo/blob/filesystem/osinterface_mock_test.go | 43 ++++--- repo/blob/gdrive/file_id_cache.go | 44 ++++--- repo/blob/logging/logging_storage.go | 4 +- repo/blob/s3/s3_storage_test.go | 1 + repo/blob/sharded/sharded.go | 6 +- repo/blob/throttling/throttler.go | 5 +- repo/content/committed_content_index.go | 24 ++-- .../committed_content_index_mem_cache.go | 9 +- repo/content/committed_read_manager.go | 9 +- repo/content/content_index_recovery.go | 26 ++-- repo/content/content_manager.go | 111 ++++++++++++------ repo/content/internal_logger.go | 21 ++-- repo/content/stats.go | 42 +++---- repo/grpc_repository_client.go | 19 ++- repo/manifest/committed_manifest_manager.go | 25 +++- repo/manifest/manifest_manager.go | 3 +- repo/object/object_manager_test.go | 10 +- repo/object/object_writer.go | 3 +- repo/recently_read.go | 9 +- snapshot/manifest.go | 6 + snapshot/restore/restore.go | 25 ++-- snapshot/snapshotfs/checkpoint_registry.go | 1 + snapshot/snapshotfs/estimate.go | 17 +-- snapshot/snapshotfs/snapshot_tree_walker.go | 6 +- snapshot/snapshotfs/upload.go | 19 ++- snapshot/snapshotfs/upload_progress.go | 25 ++-- snapshot/snapshotfs/upload_scan.go | 5 +- snapshot/snapshotfs/upload_test.go | 21 ++-- tests/robustness/checker/checker.go | 2 +- .../multiclient_test/framework/filewriter.go | 4 +- tests/robustness/pathlock/path_lock.go | 4 +- tests/tools/kopiaclient/kopiaclient.go | 3 +- tools/tools.mk | 9 ++ 63 files changed, 572 insertions(+), 271 deletions(-) diff --git a/Makefile b/Makefile index 82363dc03..69a381e73 100644 --- a/Makefile +++ b/Makefile @@ -64,7 +64,14 @@ install-noui: install install-race: go install -race $(KOPIA_BUILD_FLAGS) -tags "$(KOPIA_BUILD_TAGS)" -lint: $(linter) +check-locks: $(checklocks) +ifneq ($(GOOS)/$(GOARCH),linux/arm64) +ifneq ($(GOOS)/$(GOARCH),linux/arm) + go vet -vettool=$(checklocks) ./... +endif +endif + +lint: $(linter) check-locks ifneq ($(GOOS)/$(GOARCH),linux/arm64) ifneq ($(GOOS)/$(GOARCH),linux/arm) $(linter) --deadline $(LINTER_DEADLINE) run $(linter_flags) @@ -171,7 +178,7 @@ download-rclone: go run ./tools/gettool --tool rclone:$(RCLONE_VERSION) --output-dir dist/kopia_linux_arm_6/ --goos=linux --goarch=arm -ci-tests: lint vet test +ci-tests: lint vet test ci-integration-tests: $(MAKE) integration-tests diff --git a/cli/cli_progress.go b/cli/cli_progress.go index fcd918ac5..4afb17640 100644 --- a/cli/cli_progress.go +++ b/cli/cli_progress.go @@ -35,33 +35,47 @@ type cliProgress struct { snapshotfs.NullUploadProgress // all int64 must precede all int32 due to alignment requirements on ARM - uploadedBytes int64 - cachedBytes int64 + // +checkatomic + uploadedBytes int64 + // +checkatomic + cachedBytes int64 + // +checkatomic hashedBytes int64 outputThrottle timetrack.Throttle // is int64 - cachedFiles int32 + // +checkatomic + cachedFiles int32 + // +checkatomic inProgressHashing int32 - hashedFiles int32 - uploadedFiles int32 + // +checkatomic + hashedFiles int32 + // +checkatomic + uploadedFiles int32 + // +checkatomic ignoredErrorCount int32 - fatalErrorCount int32 + // +checkatomic + fatalErrorCount int32 - uploading int32 + // +checkatomic + uploading int32 + // +checkatomic uploadFinished int32 - lastLineLength int - spinPhase int - uploadStartTime timetrack.Estimator + outputMutex sync.Mutex - estimatedFileCount int - estimatedTotalBytes int64 + // +checklocks:outputMutex + lastLineLength int + // +checklocks:outputMutex + spinPhase int + + uploadStartTime timetrack.Estimator // +checklocksignore + + estimatedFileCount int // +checklocksignore + estimatedTotalBytes int64 // +checklocksignore // indicates shared instance that does not reset counters at the beginning of upload. shared bool - outputMutex sync.Mutex - progressFlags } @@ -181,6 +195,7 @@ func (p *cliProgress) output(col *color.Color, msg string) { p.out.printStderr("\r%v%v", line, extraSpaces) } +// +checklocks:p.outputMutex func (p *cliProgress) spinnerCharacter() string { if atomic.LoadInt32(&p.uploadFinished) == 1 { return "*" @@ -193,6 +208,7 @@ func (p *cliProgress) spinnerCharacter() string { return s } +// +checklocksignore. func (p *cliProgress) StartShared() { *p = cliProgress{ uploading: 1, @@ -207,6 +223,7 @@ func (p *cliProgress) FinishShared() { p.output(defaultColor, "") } +// +checklocksignore. func (p *cliProgress) UploadStarted() { if p.shared { // do nothing diff --git a/cli/command_content_stats.go b/cli/command_content_stats.go index 8cabc37f5..a089c6f61 100644 --- a/cli/command_content_stats.go +++ b/cli/command_content_stats.go @@ -27,7 +27,9 @@ func (c *commandContentStats) setup(svc appServices, parent commandParent) { } type contentStatsTotals struct { - originalSize, packedSize, count int64 + originalSize int64 + packedSize int64 + count int64 } func (c *commandContentStats) run(ctx context.Context, rep repo.DirectRepository) error { diff --git a/cli/command_snapshot_list.go b/cli/command_snapshot_list.go index b1cd20be2..a64f1714b 100644 --- a/cli/command_snapshot_list.go +++ b/cli/command_snapshot_list.go @@ -5,6 +5,7 @@ "fmt" "path/filepath" "strings" + "sync/atomic" "time" "github.com/fatih/color" @@ -276,7 +277,7 @@ func (c *commandSnapshotList) outputManifestFromSingleSource(ctx context.Context }) if m.IncompleteReason == "" { - lastTotalFileSize = m.Stats.TotalFileSize + lastTotalFileSize = atomic.LoadInt64(&m.Stats.TotalFileSize) } return nil @@ -390,10 +391,10 @@ func (c *commandSnapshotList) entryBits(ctx context.Context, m *snapshot.Manifes if u := m.StorageStats; u != nil { bits = append(bits, - fmt.Sprintf("new-data:%v", units.BytesStringBase10(u.NewData.PackedContentBytes)), - fmt.Sprintf("new-files:%v", int64(u.NewData.FileObjectCount)), - fmt.Sprintf("new-dirs:%v", int64(u.NewData.DirObjectCount)), - fmt.Sprintf("compression:%v", formatCompressionPercentage(u.NewData.OriginalContentBytes, u.NewData.PackedContentBytes)), + fmt.Sprintf("new-data:%v", units.BytesStringBase10(atomic.LoadInt64(&u.NewData.PackedContentBytes))), + fmt.Sprintf("new-files:%v", atomic.LoadInt32(&u.NewData.FileObjectCount)), + fmt.Sprintf("new-dirs:%v", atomic.LoadInt32(&u.NewData.DirObjectCount)), + fmt.Sprintf("compression:%v", formatCompressionPercentage(atomic.LoadInt64(&u.NewData.OriginalContentBytes), atomic.LoadInt64(&u.NewData.PackedContentBytes))), ) } diff --git a/cli/command_snapshot_verify.go b/cli/command_snapshot_verify.go index a3c452f31..b9d8af04d 100644 --- a/cli/command_snapshot_verify.go +++ b/cli/command_snapshot_verify.go @@ -58,8 +58,10 @@ func (c *commandSnapshotVerify) setup(svc appServices, parent commandParent) { } type verifier struct { - throttle timetrack.Throttle - queued int32 + throttle timetrack.Throttle + // +checkatomic + queued int32 + // +checkatomic processed int32 fileWorkItems chan verifyFileWorkItem @@ -80,7 +82,9 @@ func (v *verifier) showStats(ctx context.Context) { func (v *verifier) verifyFile(ctx context.Context, oid object.ID, entryPath string) error { log(ctx).Debugf("verifying object %v", oid) - defer atomic.AddInt32(&v.processed, 1) + defer func() { + atomic.AddInt32(&v.processed, 1) + }() contentIDs, err := v.rep.VerifyObject(ctx, oid) if err != nil { diff --git a/fs/cachefs/cache_test.go b/fs/cachefs/cache_test.go index 86f82d9d0..dd06c87ca 100644 --- a/fs/cachefs/cache_test.go +++ b/fs/cachefs/cache_test.go @@ -141,15 +141,18 @@ func (cv *cacheVerifier) reset() { } type lockState struct { - l sync.Locker + l sync.Locker + // +checkatomic locked int32 } +// +checklocksacquire:ls.l func (ls *lockState) Lock() { ls.l.Lock() atomic.AddInt32(&ls.locked, 1) } +// +checklocksrelease:ls.l func (ls *lockState) Unlock() { atomic.AddInt32(&ls.locked, -1) ls.l.Unlock() diff --git a/internal/auth/authn_repo.go b/internal/auth/authn_repo.go index 996ce3ae5..9df1373dd 100644 --- a/internal/auth/authn_repo.go +++ b/internal/auth/authn_repo.go @@ -13,11 +13,14 @@ const defaultProfileRefreshFrequency = 10 * time.Second type repositoryUserAuthenticator struct { + mu sync.Mutex + // +checklocks:mu lastRep repo.Repository - - mu sync.Mutex - nextRefreshTime time.Time - userProfiles map[string]*user.Profile + // +checklocks:mu + nextRefreshTime time.Time + // +checklocks:mu + userProfiles map[string]*user.Profile + // +checklocks:mu userProfileRefreshFrequency time.Duration } diff --git a/internal/auth/authz_acl.go b/internal/auth/authz_acl.go index 134d72358..e84c01076 100644 --- a/internal/auth/authz_acl.go +++ b/internal/auth/authz_acl.go @@ -86,12 +86,15 @@ } type aclCache struct { - aclRefreshFrequency time.Duration + aclRefreshFrequency time.Duration // +checklocksignore - mu sync.Mutex - lastRep repo.Repository + mu sync.Mutex + // +checklocks:mu + lastRep repo.Repository + // +checklocks:mu nextRefreshTime time.Time - aclEntries []*acl.Entry + // +checklocks:mu + aclEntries []*acl.Entry } // Authorize returns authorization info based on ACLs stored in the repository falling back to legacy authorizer diff --git a/internal/blobtesting/eventually_consistent.go b/internal/blobtesting/eventually_consistent.go index 3ad16cf46..db6553c3f 100644 --- a/internal/blobtesting/eventually_consistent.go +++ b/internal/blobtesting/eventually_consistent.go @@ -88,6 +88,7 @@ type eventuallyConsistentStorage struct { recentlyDeleted sync.Map listSettleTime time.Duration + // +checklocks:mu caches []*ecFrontendCache realStorage blob.Storage timeNow func() time.Time diff --git a/internal/blobtesting/map.go b/internal/blobtesting/map.go index 476cb8026..a59939d29 100644 --- a/internal/blobtesting/map.go +++ b/internal/blobtesting/map.go @@ -18,8 +18,11 @@ type DataMap map[blob.ID][]byte type mapStorage struct { - data DataMap + // +checklocks:mutex + data DataMap + // +checklocks:mutex keyTime map[blob.ID]time.Time + // +checklocks:mutex timeNow func() time.Time mutex sync.RWMutex } diff --git a/internal/blobtesting/object_locking_map.go b/internal/blobtesting/object_locking_map.go index 8a844ca6a..5ce164b88 100644 --- a/internal/blobtesting/object_locking_map.go +++ b/internal/blobtesting/object_locking_map.go @@ -29,11 +29,13 @@ type entry struct { // marker. This struct manages the retention time of each blob throug hte // PutBlob options. type objectLockingMap struct { + // +checklocks:mutex data versionedEntries - timeNow func() time.Time + timeNow func() time.Time // +checklocksignore mutex sync.RWMutex } +// +checklocksread:s.mutex func (s *objectLockingMap) getLatestByID(id blob.ID) (*entry, error) { versions, ok := s.data[id] if !ok { @@ -50,6 +52,7 @@ func (s *objectLockingMap) getLatestByID(id blob.ID) (*entry, error) { return e, nil } +// +checklocksread:s.mutex func (s *objectLockingMap) getLatestForMutationLocked(id blob.ID) (*entry, error) { e, err := s.getLatestByID(id) if err != nil { diff --git a/internal/cache/content_cache_concurrency_test.go b/internal/cache/content_cache_concurrency_test.go index e8d8237e0..ad21fa3ea 100644 --- a/internal/cache/content_cache_concurrency_test.go +++ b/internal/cache/content_cache_concurrency_test.go @@ -279,8 +279,10 @@ func testGetContentRaceFetchesOnce(t *testing.T, newCache newContentCacheFunc) { type concurrencyTester struct { mu sync.Mutex - concurrencyLevel int - maxConcurrencyLevel int + // +checklocks:mu + concurrencyLevel int + + maxConcurrencyLevel int // +checklocksignore } func (c *concurrencyTester) enter() { diff --git a/internal/cache/persistent_lru_cache.go b/internal/cache/persistent_lru_cache.go index 96dabdb68..01ef5b666 100644 --- a/internal/cache/persistent_lru_cache.go +++ b/internal/cache/persistent_lru_cache.go @@ -37,6 +37,7 @@ // PersistentCache provides persistent on-disk cache. type PersistentCache struct { + // +checkatomic anyChange int32 cacheStorage Storage diff --git a/internal/connection/reconnector.go b/internal/connection/reconnector.go index 5547f761b..f606e2712 100644 --- a/internal/connection/reconnector.go +++ b/internal/connection/reconnector.go @@ -31,7 +31,8 @@ type ConnectorImpl interface { type Reconnector struct { connector ConnectorImpl - mu sync.Mutex + mu sync.Mutex + // +checklocks:mu activeConnection Connection } diff --git a/internal/connection/reconnector_test.go b/internal/connection/reconnector_test.go index 74b050b93..10427ce5d 100644 --- a/internal/connection/reconnector_test.go +++ b/internal/connection/reconnector_test.go @@ -21,7 +21,9 @@ ) type fakeConnector struct { - nextConnectionID int32 + // +checkatomic + nextConnectionID int32 + maxConnections int connectionConcurrency int nextError error diff --git a/internal/faketime/faketime.go b/internal/faketime/faketime.go index f14519af9..c11337abd 100644 --- a/internal/faketime/faketime.go +++ b/internal/faketime/faketime.go @@ -27,6 +27,7 @@ func AutoAdvance(t time.Time, dt time.Duration) func() time.Time { // TimeAdvance allows controlling the passage of time. Intended to be used in // tests. type TimeAdvance struct { + // +checkatomic delta int64 autoDt int64 base time.Time @@ -60,7 +61,8 @@ func (t *TimeAdvance) Advance(dt time.Duration) time.Time { // ClockTimeWithOffset allows controlling the passage of time. Intended to be used in // tests. type ClockTimeWithOffset struct { - mu sync.Mutex + mu sync.Mutex + // +checklocks:mu offset time.Duration } diff --git a/internal/fusemount/fusefs.go b/internal/fusemount/fusefs.go index 13e0d34f5..7c08c7820 100644 --- a/internal/fusemount/fusefs.go +++ b/internal/fusemount/fusefs.go @@ -66,9 +66,13 @@ func (f *fuseFileNode) Open(ctx context.Context, flags uint32) (gofusefs.FileHan } type fuseFileHandle struct { - mu sync.Mutex + mu sync.Mutex + + // +checklocks:mu reader fs.Reader - file fs.File + + // +checklocks:mu + file fs.File } func (f *fuseFileHandle) Read(ctx context.Context, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) { @@ -93,6 +97,9 @@ func (f *fuseFileHandle) Read(ctx context.Context, dest []byte, off int64) (fuse } func (f *fuseFileHandle) Release(ctx context.Context) syscall.Errno { + f.mu.Lock() + defer f.mu.Unlock() + f.reader.Close() //nolint:errcheck return gofusefs.OK diff --git a/internal/gather/gather_write_buffer.go b/internal/gather/gather_write_buffer.go index ec55e3d0c..bc80bb41a 100644 --- a/internal/gather/gather_write_buffer.go +++ b/internal/gather/gather_write_buffer.go @@ -7,7 +7,7 @@ "github.com/kopia/kopia/repo/logging" ) -var log = logging.Module("gather") +var log = logging.Module("gather") // +checklocksignore // WriteBuffer is a write buffer for content of unknown size that manages // data in a series of byte slices of uniform size. diff --git a/internal/gather/gather_write_buffer_chunk.go b/internal/gather/gather_write_buffer_chunk.go index edd9f3e02..6fe1b15ef 100644 --- a/internal/gather/gather_write_buffer_chunk.go +++ b/internal/gather/gather_write_buffer_chunk.go @@ -45,17 +45,28 @@ type chunkAllocator struct { name string chunkSize int - mu sync.Mutex - freeList [][]byte - maxFreeListSize int + mu sync.Mutex + // +checklocks:mu + freeList [][]byte + // +checklocks:mu + maxFreeListSize int + // +checklocks:mu freeListHighWaterMark int - allocHighWaterMark int - allocated int - slicesAllocated int - freed int - activeChunks map[uintptr]string + // +checklocks:mu + allocHighWaterMark int + // +checklocks:mu + allocated int + // +checklocks:mu + slicesAllocated int + + // +checklocks:mu + freed int + + // +checklocks:mu + activeChunks map[uintptr]string } +// +checklocks:a.mu func (a *chunkAllocator) trackAlloc(v []byte) []byte { if trackChunkAllocations { var ( diff --git a/internal/iocopy/copy.go b/internal/iocopy/copy.go index ecc5f53ed..9f6679a28 100644 --- a/internal/iocopy/copy.go +++ b/internal/iocopy/copy.go @@ -9,8 +9,10 @@ const bufSize = 65536 var ( - mu sync.Mutex //nolint:gochecknoglobals - buffers [][]byte //nolint:gochecknoglobals + mu sync.Mutex //nolint:gochecknoglobals + + // +checklocks:mu + buffers [][]byte //nolint:gochecknoglobals ) // GetBuffer allocates new temporary buffer suitable for copying data. diff --git a/internal/logfile/logfile.go b/internal/logfile/logfile.go index fb97fa0e6..eb7df7ae3 100644 --- a/internal/logfile/logfile.go +++ b/internal/logfile/logfile.go @@ -373,14 +373,26 @@ func logLevelFromFlag(levelString string) zapcore.LevelEnabler { } type onDemandFile struct { - segmentCounter int // number of segments written - currentSegmentSize int // number of bytes written to current segment - maxSegmentSize int + // +checklocks:mu + segmentCounter int // number of segments written + + // +checklocks:mu + currentSegmentSize int // number of bytes written to current segment + + // +checklocks:mu + maxSegmentSize int + + // +checklocks:mu currentSegmentFilename string - logDir string + // +checklocks:mu + logDir string + + // +checklocks:mu logFileBaseName string - symlinkName string + + // +checklocks:mu + symlinkName string startSweep func() diff --git a/internal/memtrack/memtrack.go b/internal/memtrack/memtrack.go index 600cba043..de786d097 100644 --- a/internal/memtrack/memtrack.go +++ b/internal/memtrack/memtrack.go @@ -10,14 +10,27 @@ "github.com/kopia/kopia/repo/logging" ) -var log = logging.Module("memtrack") +var log = logging.Module("memtrack") // +checklocksignore type tracker struct { - name string - memoryTrackerMutex sync.Mutex - initialMemStats runtime.MemStats - previousMemStats runtime.MemStats - maxAlloc, maxHeapUsage, maxStackInUse uint64 + name string + + memoryTrackerMutex sync.Mutex + + // +checklocks:memoryTrackerMutex + initialMemStats runtime.MemStats + + // +checklocks:memoryTrackerMutex + previousMemStats runtime.MemStats + + // +checklocks:memoryTrackerMutex + maxAlloc uint64 + + // +checklocks:memoryTrackerMutex + maxHeapUsage uint64 + + // +checklocks:memoryTrackerMutex + maxStackInUse uint64 } func (c *tracker) dump(ctx context.Context, desc string) { diff --git a/internal/ownwrites/ownwrites.go b/internal/ownwrites/ownwrites.go index f1aab0384..0be18d353 100644 --- a/internal/ownwrites/ownwrites.go +++ b/internal/ownwrites/ownwrites.go @@ -39,7 +39,9 @@ type CacheStorage struct { prefixes []blob.ID cacheDuration time.Duration - mu sync.Mutex + mu sync.Mutex + + // +checklocks:mu nextSweepTime time.Time } diff --git a/internal/providervalidation/providervalidation.go b/internal/providervalidation/providervalidation.go index 9b4700455..7b2a964c9 100644 --- a/internal/providervalidation/providervalidation.go +++ b/internal/providervalidation/providervalidation.go @@ -221,9 +221,12 @@ type concurrencyTest struct { prefix blob.ID deadline time.Time - mu sync.Mutex - blobData map[blob.ID][]byte - blobIDs []blob.ID + mu sync.Mutex + // +checklocks:mu + blobData map[blob.ID][]byte + // +checklocks:mu + blobIDs []blob.ID + // +checklocks:mu blobWritten map[blob.ID]bool } diff --git a/internal/stats/countsum.go b/internal/stats/countsum.go index a098075ce..44a17bc80 100644 --- a/internal/stats/countsum.go +++ b/internal/stats/countsum.go @@ -9,7 +9,9 @@ // CountSum holds sum and count values. type CountSum struct { - sum int64 + // +checkatomic + sum int64 + // +checkatomic count uint32 } diff --git a/internal/stats/countsum_mutex.go b/internal/stats/countsum_mutex.go index 58955c26e..df25e5e57 100644 --- a/internal/stats/countsum_mutex.go +++ b/internal/stats/countsum_mutex.go @@ -11,8 +11,10 @@ // CountSum holds sum and count values. type CountSum struct { - mu sync.Mutex - sum int64 + mu sync.Mutex + // +checklocks:mu + sum int64 + // +checklocks:mu count uint32 } diff --git a/internal/uitask/uitask.go b/internal/uitask/uitask.go index 4536dfe55..1ec88effb 100644 --- a/internal/uitask/uitask.go +++ b/internal/uitask/uitask.go @@ -73,9 +73,11 @@ type Info struct { type runningTaskInfo struct { Info - mu sync.Mutex - maxLogMessages int - taskCancel []context.CancelFunc + maxLogMessages int // +checklocksignore + + mu sync.Mutex + // +checklocks:mu + taskCancel []context.CancelFunc } // CurrentTaskID implements the Controller interface. diff --git a/internal/uitask/uitask_manager.go b/internal/uitask/uitask_manager.go index e9fa03afc..3165a9bc2 100644 --- a/internal/uitask/uitask_manager.go +++ b/internal/uitask/uitask_manager.go @@ -21,13 +21,16 @@ // Manager manages UI tasks. type Manager struct { - mu sync.Mutex + mu sync.Mutex + // +checklocks:mu nextTaskID int - running map[string]*runningTaskInfo - finished map[string]*Info + // +checklocks:mu + running map[string]*runningTaskInfo + // +checklocks:mu + finished map[string]*Info - MaxFinishedTasks int - MaxLogMessagesPerTask int + MaxFinishedTasks int // +checklocksignore + MaxLogMessagesPerTask int // +checklocksignore } // Controller allows the task to communicate with task manager and receive signals. diff --git a/internal/webdavmount/webdavmount.go b/internal/webdavmount/webdavmount.go index 5a98a5b28..024c74a7f 100644 --- a/internal/webdavmount/webdavmount.go +++ b/internal/webdavmount/webdavmount.go @@ -25,12 +25,15 @@ type webdavFile struct { // webdavFile implements webdav.File but needs context - ctx context.Context //nolint:containedctx + // +checklocks:mu + ctx context.Context // nolint:containedctx entry fs.File mu sync.Mutex - r fs.Reader + + // +checklocks:mu + r fs.Reader } func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) { diff --git a/internal/workshare/workshare_pool.go b/internal/workshare/workshare_pool.go index a126921f8..f122b1f77 100644 --- a/internal/workshare/workshare_pool.go +++ b/internal/workshare/workshare_pool.go @@ -19,6 +19,7 @@ type workItem struct { // Pool manages a pool of generic workers that can process workItem. type Pool struct { + // +checkatomic activeWorkers int32 semaphore chan struct{} diff --git a/repo/blob/filesystem/osinterface_mock_test.go b/repo/blob/filesystem/osinterface_mock_test.go index 5d97dc30b..533144b16 100644 --- a/repo/blob/filesystem/osinterface_mock_test.go +++ b/repo/blob/filesystem/osinterface_mock_test.go @@ -13,21 +13,36 @@ var errNonRetriable = errors.Errorf("some non-retriable error") type mockOS struct { - readFileRemainingErrors int32 - writeFileRemainingErrors int32 - writeFileCloseRemainingErrors int32 - createNewFileRemainingErrors int32 - mkdirAllRemainingErrors int32 - renameRemainingErrors int32 - removeRemainingRetriableErrors int32 - removeRemainingNonRetriableErrors int32 - chownRemainingErrors int32 - readDirRemainingErrors int32 - readDirRemainingNonRetriableErrors int32 + // +checkatomic + readFileRemainingErrors int32 + // +checkatomic + writeFileRemainingErrors int32 + // +checkatomic + writeFileCloseRemainingErrors int32 + // +checkatomic + createNewFileRemainingErrors int32 + // +checkatomic + mkdirAllRemainingErrors int32 + // +checkatomic + renameRemainingErrors int32 + // +checkatomic + removeRemainingRetriableErrors int32 + // +checkatomic + removeRemainingNonRetriableErrors int32 + // +checkatomic + chownRemainingErrors int32 + // +checkatomic + readDirRemainingErrors int32 + // +checkatomic + readDirRemainingNonRetriableErrors int32 + // +checkatomic readDirRemainingFileDeletedDirEntry int32 - readDirRemainingFatalDirEntry int32 - statRemainingErrors int32 - chtimesRemainingErrors int32 + // +checkatomic + readDirRemainingFatalDirEntry int32 + // +checkatomic + statRemainingErrors int32 + // +checkatomic + chtimesRemainingErrors int32 effectiveUID int diff --git a/repo/blob/gdrive/file_id_cache.go b/repo/blob/gdrive/file_id_cache.go index 6b87d8e20..11dd14a56 100644 --- a/repo/blob/gdrive/file_id_cache.go +++ b/repo/blob/gdrive/file_id_cache.go @@ -14,14 +14,17 @@ type fileIDCache struct { // Map of blobID -> *cacheEntry. Blobs sync.Map + + // Guards access to ChangeLog. + mu sync.RWMutex // Record of recent cache changes. // It's stored as a synchronized circular buffer. - ChangeLog [changeLogCacheSize]changeEntry - // ChangeLogIdx indicates the next location to write to. - // The log entry range is [ChangeLogIdx+1, ChangeLogIdx-1]. - ChangeLogIdx int - // Guards access to ChangeLog. - ChangeLogMut sync.RWMutex + // +checklocks:mu + changeLog [changeLogCacheSize]changeEntry + // changeLogIdx indicates the next location to write to. + // The log entry range is [changeLogIdx+1, changeLogIdx-1]. + // +checklocks:mu + changeLogIdx int } // cacheEntry is a blob cache entry. @@ -79,36 +82,39 @@ func (cache *fileIDCache) BlindPut(blobID blob.ID, fileID string) { // RecordBlobChange records a newly created or deleted blob. // An empty fileID signals that the blob is deleted. func (cache *fileIDCache) RecordBlobChange(blobID blob.ID, fileID string) { - cache.ChangeLogMut.Lock() + cache.mu.Lock() - i := cache.ChangeLogIdx - cache.ChangeLog[i] = changeEntry{ + i := cache.changeLogIdx + cache.changeLog[i] = changeEntry{ BlobID: blobID, FileID: fileID, } - cache.ChangeLogIdx = circularBufferNext(i) + cache.changeLogIdx = circularBufferNext(i) - cache.ChangeLogMut.Unlock() + cache.mu.Unlock() } // VisitBlobChanges iterates through newly created or deleted blobs. func (cache *fileIDCache) VisitBlobChanges(callback func(blobID blob.ID, fileID string)) { - cache.ChangeLogMut.RLock() + cache.mu.RLock() - for i := circularBufferNext(cache.ChangeLogIdx); i != cache.ChangeLogIdx; i = circularBufferNext(i) { - entry := cache.ChangeLog[i] + for i := circularBufferNext(cache.changeLogIdx); i != cache.changeLogIdx; i = circularBufferNext(i) { + entry := cache.changeLog[i] if entry.BlobID != "" { callback(entry.BlobID, entry.FileID) } } - cache.ChangeLogMut.RUnlock() + cache.mu.RUnlock() } // Clear resets the file ID cache. func (cache *fileIDCache) Clear() { + cache.mu.Lock() + defer cache.mu.Unlock() + cache.Blobs = sync.Map{} - cache.ChangeLog = [changeLogCacheSize]changeEntry{} + cache.changeLog = [changeLogCacheSize]changeEntry{} } func circularBufferNext(curr int) int { @@ -123,8 +129,8 @@ func circularBufferNext(curr int) int { func newFileIDCache() *fileIDCache { return &fileIDCache{ Blobs: sync.Map{}, - ChangeLog: [changeLogCacheSize]changeEntry{}, - ChangeLogIdx: 0, - ChangeLogMut: sync.RWMutex{}, + changeLog: [changeLogCacheSize]changeEntry{}, + changeLogIdx: 0, + mu: sync.RWMutex{}, } } diff --git a/repo/blob/logging/logging_storage.go b/repo/blob/logging/logging_storage.go index 8dc9a5764..ed5bfb481 100644 --- a/repo/blob/logging/logging_storage.go +++ b/repo/blob/logging/logging_storage.go @@ -11,7 +11,9 @@ ) type loggingStorage struct { - concurrency int32 + // +checkatomic + concurrency int32 + // +checkatomic maxConcurrency int32 base blob.Storage diff --git a/repo/blob/s3/s3_storage_test.go b/repo/blob/s3/s3_storage_test.go index dc6ed849e..37efcc9b2 100644 --- a/repo/blob/s3/s3_storage_test.go +++ b/repo/blob/s3/s3_storage_test.go @@ -736,6 +736,7 @@ func createMinioSessionToken(t *testing.T, minioEndpoint, kopiaUserName, kopiaUs // provider to force expiration of the credentials. This causes // the next call to Retrieve to return expired credentials. type customProvider struct { + // +checkatomic forceExpired atomic.Value stsProvider miniocreds.STSAssumeRole } diff --git a/repo/blob/sharded/sharded.go b/repo/blob/sharded/sharded.go index 8002da7ed..a679b59fc 100644 --- a/repo/blob/sharded/sharded.go +++ b/repo/blob/sharded/sharded.go @@ -20,7 +20,7 @@ // CompleteBlobSuffix is the extension for sharded blobs that have completed writing. const CompleteBlobSuffix = ".f" -var log = logging.Module("sharded") +var log = logging.Module("sharded") // +checklocksignore // Impl must be implemented by underlying provided. type Impl interface { @@ -39,7 +39,9 @@ type Storage struct { Options parametersMutex sync.Mutex - parameters *Parameters + + // +checklocks:parametersMutex + parameters *Parameters } // GetBlob implements blob.Storage. diff --git a/repo/blob/throttling/throttler.go b/repo/blob/throttling/throttler.go index 731248ba4..53aa80f67 100644 --- a/repo/blob/throttling/throttler.go +++ b/repo/blob/throttling/throttler.go @@ -17,7 +17,8 @@ type SettableThrottler interface { } type tokenBucketBasedThrottler struct { - mu sync.Mutex + mu sync.Mutex + // +checklocks:mu limits Limits readOps *tokenBucket @@ -25,7 +26,7 @@ type tokenBucketBasedThrottler struct { listOps *tokenBucket upload *tokenBucket download *tokenBucket - window time.Duration + window time.Duration // +checklocksignore } func (t *tokenBucketBasedThrottler) BeforeOperation(ctx context.Context, op string) { diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index a86a3bc19..039d8b8b1 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -24,13 +24,17 @@ const smallIndexEntryCountThreshold = 100 type committedContentIndex struct { + // +checkatomic rev int64 cache committedContentIndexCache - mu sync.Mutex + mu sync.Mutex + // +checklocks:mu deletionWatermark time.Time - inUse map[blob.ID]packIndex - merged mergedIndex + // +checklocks:mu + inUse map[blob.ID]packIndex + // +checklocks:mu + merged mergedIndex v1PerContentOverhead uint32 indexVersion int @@ -58,7 +62,7 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) { info, err := c.merged.GetInfo(contentID) if info != nil { - if c.shouldIgnore(info) { + if shouldIgnore(info, c.deletionWatermark) { return nil, ErrContentNotFound } @@ -72,12 +76,12 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) { return nil, err } -func (c *committedContentIndex) shouldIgnore(id Info) bool { +func shouldIgnore(id Info, deletionWatermark time.Time) bool { if !id.GetDeleted() { return false } - return !id.Timestamp().After(c.deletionWatermark) + return !id.Timestamp().After(deletionWatermark) } func (c *committedContentIndex) addIndexBlob(ctx context.Context, indexBlobID blob.ID, data gather.Bytes, use bool) error { @@ -85,7 +89,9 @@ func (c *committedContentIndex) addIndexBlob(ctx context.Context, indexBlobID bl // doing it prematurely might confuse callers of revision() who may cache // a set of old contents and associate it with new revision, before new contents // are actually available. - defer atomic.AddInt64(&c.rev, 1) + defer func() { + atomic.AddInt64(&c.rev, 1) + }() if err := c.cache.addContentToCache(ctx, indexBlobID, data); err != nil { return errors.Wrap(err, "error adding content to cache") @@ -118,10 +124,11 @@ func (c *committedContentIndex) addIndexBlob(ctx context.Context, indexBlobID bl func (c *committedContentIndex) listContents(r IDRange, cb func(i Info) error) error { c.mu.Lock() m := append(mergedIndex(nil), c.merged...) + deletionWatermark := c.deletionWatermark c.mu.Unlock() return m.Iterate(r, func(i Info) error { - if c.shouldIgnore(i) { + if shouldIgnore(i, deletionWatermark) { return nil } @@ -129,6 +136,7 @@ func (c *committedContentIndex) listContents(r IDRange, cb func(i Info) error) e }) } +// +checklocks:c.mu func (c *committedContentIndex) indexFilesChanged(indexFiles []blob.ID) bool { if len(indexFiles) != len(c.inUse) { return true diff --git a/repo/content/committed_content_index_mem_cache.go b/repo/content/committed_content_index_mem_cache.go index c10577da3..144eed9c6 100644 --- a/repo/content/committed_content_index_mem_cache.go +++ b/repo/content/committed_content_index_mem_cache.go @@ -12,9 +12,12 @@ ) type memoryCommittedContentIndexCache struct { - mu sync.Mutex - contents map[blob.ID]packIndex - v1PerContentOverhead uint32 + mu sync.Mutex + + // +checklocks:mu + contents map[blob.ID]packIndex + + v1PerContentOverhead uint32 // +checklocksignore } func (m *memoryCommittedContentIndexCache) hasIndexBlobID(ctx context.Context, indexBlobID blob.ID) (bool, error) { diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 01e0c8eff..ce807517b 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -71,8 +71,10 @@ type indexBlobManager interface { // SharedManager is responsible for read-only access to committed data. type SharedManager struct { + // +checkatomic refCount int32 // number of Manager objects that refer to this SharedManager - closed int32 // set to 1 if shared manager has been closed + // +checkatomic + closed int32 // set to 1 if shared manager has been closed Stats *Stats st blob.Storage @@ -94,6 +96,7 @@ type SharedManager struct { indexesLock sync.RWMutex // maybeRefreshIndexes() will call Refresh() after this point in ime. + // +checklocks:indexesLock refreshIndexesAfter time.Time format FormattingOptions @@ -182,6 +185,7 @@ func (sm *SharedManager) attemptReadPackFileLocalIndex(ctx context.Context, pack "unable to decrypt local index") } +// +checklocks:sm.indexesLock func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error { nextSleepTime := 100 * time.Millisecond //nolint:gomnd @@ -607,6 +611,9 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions return nil, errors.Wrap(err, "error setting up read manager caches") } + sm.indexesLock.Lock() + defer sm.indexesLock.Unlock() + if err := sm.loadPackIndexesLocked(ctx); err != nil { return nil, errors.Wrap(err, "error loading indexes") } diff --git a/repo/content/content_index_recovery.go b/repo/content/content_index_recovery.go index 18de5f5f6..deecfd372 100644 --- a/repo/content/content_index_recovery.go +++ b/repo/content/content_index_recovery.go @@ -29,20 +29,26 @@ func (bm *WriteManager) RecoverIndexFromPackBlob(ctx context.Context, packFile b var recovered []Info - bm.lock() - defer bm.unlock() - err = ndx.Iterate(AllIDs, func(i Info) error { - recovered = append(recovered, i) - if commit { - // 'i' is ephemeral and will depend on temporary buffers which - // won't be available when this function returns, we need to - // convert it to durable struct. - bm.packIndexBuilder.Add(ToInfoStruct(i)) - } + // 'i' is ephemeral and will depend on temporary buffers which + // won't be available when this function returns, we need to + // convert it to durable struct. + is := ToInfoStruct(i) + + recovered = append(recovered, is) + return nil }) + if commit { + bm.lock() + defer bm.unlock() + + for _, is := range recovered { + bm.packIndexBuilder.Add(is) + } + } + return recovered, errors.Wrap(err, "error iterating index entries") } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 753e28650..4a1b69b48 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -79,11 +79,15 @@ type IndexBlobInfo struct { // WriteManager builds content-addressable storage with encryption, deduplication and packaging on top of BLOB store. type WriteManager struct { - revision int64 // changes on each local write + // +checkatomic + revision int64 // changes on each local write + // +checkatomic disableIndexRefresh int32 - mu *sync.RWMutex - cond *sync.Cond + mu sync.RWMutex + // +checklocks:mu + cond *sync.Cond + // +checklocks:mu flushing bool sessionUser string // user@host to report as session owners @@ -92,13 +96,19 @@ type WriteManager struct { currentSessionInfo SessionInfo sessionMarkerBlobIDs []blob.ID // session marker blobs written so far - pendingPacks map[blob.ID]*pendingPackInfo - writingPacks []*pendingPackInfo // list of packs that are being written - failedPacks []*pendingPackInfo // list of packs that failed to write, will be retried - packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed) + // +checklocks:mu + pendingPacks map[blob.ID]*pendingPackInfo + // +checklocks:mu + writingPacks []*pendingPackInfo // list of packs that are being written + // +checklocks:mu + failedPacks []*pendingPackInfo // list of packs that failed to write, will be retried + // +checklocks:mu + packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed) + // +checklocks:mu disableIndexFlushCount int - flushPackIndexesAfter time.Time // time when those indexes should be flushed + // +checklocks:mu + flushPackIndexesAfter time.Time // time when those indexes should be flushed onUpload func(int64) @@ -177,6 +187,7 @@ func (bm *WriteManager) maybeRefreshIndexes(ctx context.Context) error { } // Intentionally passing bi by value. +// +checklocks:bm.mu func (bm *WriteManager) deletePreexistingContent(ctx context.Context, ci Info) error { if ci.GetDeleted() { return nil @@ -223,7 +234,7 @@ func (bm *WriteManager) maybeRetryWritingFailedPacksUnlocked(ctx context.Context for _, pp := range fp { bm.log.Debugf("retry-write %v", pp.packBlobID) - if err := bm.writePackAndAddToIndexLocked(ctx, pp, true); err != nil { + if err := bm.writePackAndAddToIndexLocked(ctx, pp); err != nil { return errors.Wrap(err, "error writing previously failed pack") } } @@ -275,7 +286,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat for _, pp := range fp { bm.log.Debugf("retry-write %v", pp.packBlobID) - if err = bm.writePackAndAddToIndexLocked(ctx, pp, true); err != nil { + if err = bm.writePackAndAddToIndexLocked(ctx, pp); err != nil { bm.unlock() return errors.Wrap(err, "error writing previously failed pack") } @@ -298,6 +309,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat } if _, err := compressedAndEncrypted.Bytes().WriteTo(pp.currentPackData); err != nil { + bm.unlock() return errors.Wrapf(err, "unable to append %q to pack data", contentID) } @@ -319,7 +331,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat // at this point we're unlocked so different goroutines can encrypt and // save to storage in parallel. if shouldWrite { - if err := bm.writePackAndAddToIndexLocked(ctx, pp, false); err != nil { + if err := bm.acquireLockAndWritePackAndAddToIndex(ctx, pp); err != nil { return errors.Wrap(err, "unable to write pack") } } @@ -344,11 +356,13 @@ func (bm *WriteManager) EnableIndexFlush(ctx context.Context) { bm.disableIndexFlushCount-- } +// +checklocks:bm.mu func (bm *WriteManager) verifyInvariantsLocked() { bm.verifyCurrentPackItemsLocked() bm.verifyPackIndexBuilderLocked() } +// +checklocks:bm.mu func (bm *WriteManager) verifyCurrentPackItemsLocked() { for _, pp := range bm.pendingPacks { for k, cpi := range pp.currentPackItems { @@ -363,6 +377,7 @@ func (bm *WriteManager) verifyCurrentPackItemsLocked() { } } +// +checklocks:bm.mu func (bm *WriteManager) verifyPackIndexBuilderLocked() { for k, cpi := range bm.packIndexBuilder { bm.assertInvariant(cpi.GetContentID() == k, "content ID entry has invalid key: %v %v", cpi.GetContentID(), k) @@ -390,6 +405,18 @@ func (bm *WriteManager) assertInvariant(ok bool, errorMsg string, arg ...interfa panic(errorMsg) } +// +checklocksread:bm.indexesLock +func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather.Bytes, sessionID SessionID) ([]blob.Metadata, error) { + // nolint:wrapcheck + return bm.indexBlobManager.writeIndexBlobs(ctx, dataShards, sessionID) +} + +// +checklocksread:bm.indexesLock +func (bm *WriteManager) addIndexBlob(ctx context.Context, indexBlobID blob.ID, data gather.Bytes, use bool) error { + return bm.committedContents.addIndexBlob(ctx, indexBlobID, data, use) +} + +// +checklocks:bm.mu func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { if bm.disableIndexFlushCount > 0 { bm.log.Debugf("not flushing index because flushes are currently disabled") @@ -410,7 +437,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { bm.indexesLock.RLock() defer bm.indexesLock.RUnlock() - indexBlobMDs, err := bm.indexBlobManager.writeIndexBlobs(ctx, dataShards, bm.currentSessionInfo.ID) + indexBlobMDs, err := bm.writeIndexBlobs(ctx, dataShards, bm.currentSessionInfo.ID) if err != nil { return errors.Wrap(err, "error writing index blob") } @@ -425,7 +452,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { for i, indexBlobMD := range indexBlobMDs { bm.onUpload(int64(dataShards[i].Length())) - if err := bm.committedContents.addIndexBlob(ctx, indexBlobMD.BlobID, dataShards[i], true); err != nil { + if err := bm.addIndexBlob(ctx, indexBlobMD.BlobID, dataShards[i], true); err != nil { return errors.Wrap(err, "unable to add committed content") } } @@ -438,12 +465,13 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { return nil } +// +checklocks:bm.mu func (bm *WriteManager) finishAllPacksLocked(ctx context.Context) error { for prefix, pp := range bm.pendingPacks { delete(bm.pendingPacks, prefix) bm.writingPacks = append(bm.writingPacks, pp) - if err := bm.writePackAndAddToIndexLocked(ctx, pp, true); err != nil { + if err := bm.writePackAndAddToIndexLocked(ctx, pp); err != nil { return errors.Wrap(err, "error writing pack content") } } @@ -451,18 +479,19 @@ func (bm *WriteManager) finishAllPacksLocked(ctx context.Context) error { return nil } -func (bm *WriteManager) writePackAndAddToIndexLocked(ctx context.Context, pp *pendingPackInfo, holdingLock bool) error { +func (bm *WriteManager) acquireLockAndWritePackAndAddToIndex(ctx context.Context, pp *pendingPackInfo) error { + bm.lock() + defer bm.unlock() + + defer bm.cond.Broadcast() + + return bm.writePackAndAddToIndexLocked(ctx, pp) +} + +// +checklocks:bm.mu +func (bm *WriteManager) writePackAndAddToIndexLocked(ctx context.Context, pp *pendingPackInfo) error { packFileIndex, err := bm.prepareAndWritePackInternal(ctx, pp) - if !holdingLock { - bm.lock() - - defer func() { - bm.cond.Broadcast() - bm.unlock() - }() - } - // after finishing writing, remove from both writingPacks and failedPacks bm.writingPacks = removePendingPack(bm.writingPacks, pp) bm.failedPacks = removePendingPack(bm.failedPacks, pp) @@ -524,6 +553,11 @@ func (bm *WriteManager) Close(ctx context.Context) error { return bm.SharedManager.release(ctx) } +// +checklocks:bm.mu +func (bm *WriteManager) setFlushingLocked(v bool) { + bm.flushing = v +} + // Flush completes writing any pending packs and writes pack indexes to the underlying storage. // Any pending writes completed before Flush() has started are guaranteed to be committed to the // repository before Flush() returns. @@ -533,14 +567,11 @@ func (bm *WriteManager) Flush(ctx context.Context) error { bm.log.Debugf("flush") - bm.flushing = true + // when finished flushing, notify goroutines that were waiting for it. + defer bm.cond.Broadcast() - defer func() { - bm.flushing = false - - // we finished flushing, notify goroutines that were waiting for it. - bm.cond.Broadcast() - }() + bm.setFlushingLocked(true) + defer bm.setFlushingLocked(false) // see if we have any packs that have failed previously // retry writing them now. @@ -551,7 +582,7 @@ func (bm *WriteManager) Flush(ctx context.Context) error { for _, pp := range fp { bm.log.Debugf("retry-write %v", pp.packBlobID) - if err := bm.writePackAndAddToIndexLocked(ctx, pp, true); err != nil { + if err := bm.writePackAndAddToIndexLocked(ctx, pp); err != nil { return errors.Wrap(err, "error writing previously failed pack") } } @@ -644,6 +675,7 @@ func packPrefixForContentID(contentID ID) blob.ID { return PackBlobIDPrefixRegular } +// +checklocks:bm.mu func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, prefix blob.ID) (*pendingPackInfo, error) { if pp := bm.pendingPacks[prefix]; pp != nil { return pp, nil @@ -745,6 +777,7 @@ func (bm *WriteManager) GetContent(ctx context.Context, contentID ID) (v []byte, return tmp.ToByteSlice(), nil } +// +checklocksread:bm.mu func (bm *WriteManager) getOverlayContentInfoReadLocked(contentID ID) (*pendingPackInfo, Info, bool) { // check added contents, not written to any packs yet. for _, pp := range bm.pendingPacks { @@ -768,6 +801,7 @@ func (bm *WriteManager) getOverlayContentInfoReadLocked(contentID ID) (*pendingP return nil, nil, false } +// +checklocksread:bm.mu func (bm *WriteManager) getContentInfoReadLocked(ctx context.Context, contentID ID) (*pendingPackInfo, Info, error) { if pp, ci, ok := bm.getOverlayContentInfoReadLocked(contentID); ok { return pp, ci, nil @@ -802,10 +836,12 @@ func (bm *WriteManager) DisableIndexRefresh() { atomic.StoreInt32(&bm.disableIndexRefresh, 1) } +// +checklocksacquire:bm.mu func (bm *WriteManager) lock() { bm.mu.Lock() } +// +checklocksrelease:bm.mu func (bm *WriteManager) unlock() { if bm.checkInvariantsOnUnlock { bm.verifyInvariantsLocked() @@ -863,20 +899,15 @@ type SessionOptions struct { // NewWriteManager returns a session write manager. func NewWriteManager(ctx context.Context, sm *SharedManager, options SessionOptions, writeManagerID string) *WriteManager { - mu := &sync.RWMutex{} - sm.addRef() if options.OnUpload == nil { options.OnUpload = func(int64) {} } - return &WriteManager{ + wm := &WriteManager{ SharedManager: sm, - mu: mu, - cond: sync.NewCond(mu), - flushPackIndexesAfter: sm.timeNow().Add(flushPackIndexTimeout), pendingPacks: map[blob.ID]*pendingPackInfo{}, packIndexBuilder: make(packIndexBuilder), @@ -886,4 +917,8 @@ func NewWriteManager(ctx context.Context, sm *SharedManager, options SessionOpti log: sm.namedLogger(writeManagerID), } + + wm.cond = sync.NewCond(&wm.mu) + + return wm } diff --git a/repo/content/internal_logger.go b/repo/content/internal_logger.go index 7600d7c0c..3871a3ccb 100644 --- a/repo/content/internal_logger.go +++ b/repo/content/internal_logger.go @@ -25,6 +25,7 @@ const TextLogBlobPrefix = "_log_" type internalLogManager struct { + // +checkatomic enabled int32 // set by enable(), logger is ineffective until called // internalLogManager implements io.Writer and we must be able to write to the @@ -97,14 +98,20 @@ func (m *internalLogManager) NewLogger() *zap.SugaredLogger { // internalLogger represents a single log session that saves log files as blobs in the repository. // The logger starts disabled and to actually persist logs enable() must be called. type internalLogger struct { + // +checkatomic nextChunkNumber int32 // chunk number incremented using atomic.AddInt32() - m *internalLogManager - mu sync.Mutex - buf *gather.WriteBuffer - gzw *gzip.Writer + m *internalLogManager + mu sync.Mutex + + // +checklocks:mu + buf *gather.WriteBuffer + // +checklocks:mu + gzw *gzip.Writer + startTime int64 // unix timestamp of the first log - prefix blob.ID + + prefix blob.ID // +checklocksignore } func (m *internalLogManager) enable() { @@ -133,9 +140,7 @@ func (l *internalLogger) maybeEncryptAndWriteChunkUnlocked(data gather.Bytes, cl endTime := l.m.timeFunc().Unix() - l.mu.Lock() prefix := blob.ID(fmt.Sprintf("%v_%v_%v_%v_", l.prefix, l.startTime, endTime, atomic.AddInt32(&l.nextChunkNumber, 1))) - l.mu.Unlock() l.m.encryptAndWriteLogBlob(prefix, data, closeFunc) } @@ -156,6 +161,7 @@ func (l *internalLogger) addAndMaybeFlush(b []byte) (payload gather.Bytes, close return l.flushAndResetLocked() } +// +checklocks:l.mu func (l *internalLogger) ensureWriterInitializedLocked() io.Writer { if l.gzw == nil { l.buf = gather.NewWriteBuffer() @@ -166,6 +172,7 @@ func (l *internalLogger) ensureWriterInitializedLocked() io.Writer { return l.gzw } +// +checklocks:l.mu func (l *internalLogger) flushAndResetLocked() (payload gather.Bytes, closeFunc func()) { if l.gzw == nil { return gather.Bytes{}, func() {} diff --git a/repo/content/stats.go b/repo/content/stats.go index c460d5b5f..43085527d 100644 --- a/repo/content/stats.go +++ b/repo/content/stats.go @@ -8,17 +8,27 @@ type Stats struct { // Keep int64 fields first to ensure they get aligned to at least 64-bit // boundaries, which is required for atomic access on ARM and x86-32. - readBytes int64 - writtenBytes int64 + // +checkatomic + readBytes int64 + // +checkatomic + writtenBytes int64 + // +checkatomic decryptedBytes int64 + // +checkatomic encryptedBytes int64 - hashedBytes int64 + // +checkatomic + hashedBytes int64 - readContents uint32 + // +checkatomic + readContents uint32 + // +checkatomic writtenContents uint32 - hashedContents uint32 + // +checkatomic + hashedContents uint32 + // +checkatomic invalidContents uint32 - validContents uint32 + // +checkatomic + validContents uint32 } // Reset clears all content statistics. @@ -38,17 +48,17 @@ func (s *Stats) Reset() { // ReadContent returns the approximate read content count and their total size in bytes. func (s *Stats) ReadContent() (count uint32, bytes int64) { - return readCountSum(&s.readContents, &s.readBytes) + return atomic.LoadUint32(&s.readContents), atomic.LoadInt64(&s.readBytes) } // WrittenContent returns the approximate written content count and their total size in bytes. func (s *Stats) WrittenContent() (count uint32, bytes int64) { - return readCountSum(&s.writtenContents, &s.writtenBytes) + return atomic.LoadUint32(&s.writtenContents), atomic.LoadInt64(&s.writtenBytes) } // HashedContent returns the approximate hashed content count and their total size in bytes. func (s *Stats) HashedContent() (count uint32, bytes int64) { - return readCountSum(&s.hashedContents, &s.hashedBytes) + return atomic.LoadUint32(&s.hashedContents), atomic.LoadInt64(&s.hashedBytes) } // DecryptedBytes returns the approximate total number of decrypted bytes. @@ -80,15 +90,15 @@ func (s *Stats) encrypted(size int) int64 { } func (s *Stats) readContent(size int) (count uint32, sum int64) { - return updateCountSum(&s.readContents, &s.readBytes, size) + return atomic.AddUint32(&s.readContents, 1), atomic.AddInt64(&s.readBytes, int64(size)) } func (s *Stats) wroteContent(size int) (count uint32, sum int64) { - return updateCountSum(&s.writtenContents, &s.writtenBytes, size) + return atomic.AddUint32(&s.writtenContents, 1), atomic.AddInt64(&s.writtenBytes, int64(size)) } func (s *Stats) hashedContent(size int) (count uint32, sum int64) { - return updateCountSum(&s.hashedContents, &s.hashedBytes, size) + return atomic.AddUint32(&s.hashedContents, 1), atomic.AddInt64(&s.hashedBytes, int64(size)) } func (s *Stats) foundValidContent() uint32 { @@ -98,11 +108,3 @@ func (s *Stats) foundValidContent() uint32 { func (s *Stats) foundInvalidContent() uint32 { return atomic.AddUint32(&s.invalidContents, 1) } - -func updateCountSum(count *uint32, sum *int64, delta int) (updatedCount uint32, updatedSum int64) { - return atomic.AddUint32(count, 1), atomic.AddInt64(sum, int64(delta)) -} - -func readCountSum(count *uint32, sum *int64) (c uint32, s int64) { - return atomic.LoadUint32(count), atomic.LoadInt64(sum) -} diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index 0190d2f55..cb994564f 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -59,13 +59,16 @@ type grpcRepositoryClient struct { conn *grpc.ClientConn innerSessionMutex sync.Mutex - innerSession *grpcInnerSession + + // +checklocks:innerSessionMutex + innerSession *grpcInnerSession opt WriteSessionOptions isReadOnly bool transparentRetries bool // how many times we tried to establish inner session + // +checklocks:innerSessionMutex innerSessionAttemptCount int asyncWritesSemaphore chan struct{} @@ -86,10 +89,15 @@ type grpcInnerSession struct { sendMutex sync.Mutex activeRequestsMutex sync.Mutex - nextRequestID int64 - activeRequests map[int64]chan *apipb.SessionResponse - cli apipb.KopiaRepository_SessionClient - repoParams *apipb.RepositoryParameters + + // +checklocks:activeRequestsMutex + nextRequestID int64 + + // +checklocks:activeRequestsMutex + activeRequests map[int64]chan *apipb.SessionResponse + + cli apipb.KopiaRepository_SessionClient + repoParams *apipb.RepositoryParameters } // readLoop runs in a goroutine and consumes all messages in session and forwards them to appropriate channels. @@ -152,6 +160,7 @@ func (r *grpcInnerSession) sendRequest(ctx context.Context, req *apipb.SessionRe return ch } +// +checklocks:r.activeRequestsMutex func (r *grpcInnerSession) getAndDeleteResponseChannelLocked(rid int64) chan *apipb.SessionResponse { ch := r.activeRequests[rid] delete(r.activeRequests, rid) diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index d872656af..5ad37d44e 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -21,12 +21,16 @@ type committedManifestManager struct { b contentManager - debugID string + debugID string // +checklocksignore - cmmu sync.Mutex - lastRevision int64 - locked bool - committedEntries map[ID]*manifestEntry + cmmu sync.Mutex + // +checklocks:cmmu + lastRevision int64 + // +checklocks:cmmu + locked bool + // +checklocks:cmmu + committedEntries map[ID]*manifestEntry + // +checklocks:cmmu committedContentIDs map[content.ID]bool } @@ -41,6 +45,7 @@ func (m *committedManifestManager) getCommittedEntryOrNil(ctx context.Context, i return m.committedEntries[id], nil } +// +checklocks:m.cmmu func (m *committedManifestManager) dump(ctx context.Context, prefix string) { if m.debugID == "" { return @@ -86,6 +91,7 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma // NOTE: this function is used in two cases - to write pending entries (where the caller acquires // the lock via commitEntries()) and to compact existing committed entries during compaction // where the lock is already being held. +// +checklocks:m.cmmu func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { if len(entries) == 0 { return nil, nil @@ -120,6 +126,7 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri return map[content.ID]bool{contentID: true}, nil } +// +checklocks:m.cmmu func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Context) error { m.verifyLocked() @@ -168,6 +175,7 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte return nil } +// +checklocks:m.cmmu func (m *committedManifestManager) loadManifestContentsLocked(manifests map[content.ID]manifest) { m.committedEntries = map[ID]*manifestEntry{} m.committedContentIDs = map[content.ID]bool{} @@ -197,6 +205,7 @@ func (m *committedManifestManager) compact(ctx context.Context) error { return m.compactLocked(ctx) } +// +checklocks:m.cmmu func (m *committedManifestManager) maybeCompactLocked(ctx context.Context) error { m.verifyLocked() @@ -217,6 +226,7 @@ func (m *committedManifestManager) maybeCompactLocked(ctx context.Context) error return nil } +// +checklocks:m.cmmu func (m *committedManifestManager) compactLocked(ctx context.Context) error { m.verifyLocked() @@ -258,6 +268,7 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error { return nil } +// +checklocks:m.cmmu func (m *committedManifestManager) mergeEntryLocked(e *manifestEntry) { m.verifyLocked() @@ -272,6 +283,7 @@ func (m *committedManifestManager) mergeEntryLocked(e *manifestEntry) { } } +// +checklocks:m.cmmu func (m *committedManifestManager) ensureInitializedLocked(ctx context.Context) error { rev := m.b.Revision() if m.lastRevision == rev { @@ -297,16 +309,19 @@ func (m *committedManifestManager) ensureInitializedLocked(ctx context.Context) return nil } +// +checklocksacquire:m.cmmu func (m *committedManifestManager) lock() { m.cmmu.Lock() m.locked = true } +// +checklocksrelease:m.cmmu func (m *committedManifestManager) unlock() { m.locked = false m.cmmu.Unlock() } +// +checklocks:m.cmmu func (m *committedManifestManager) verifyLocked() { if !m.locked { panic("not locked") diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index 43dc2ab34..86529d7ff 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -24,7 +24,7 @@ manifestIDLength = 16 ) -var log = logging.Module("kopia/manifest") +var log = logging.Module("kopia/manifest") // +checklocksignore // ErrNotFound is returned when the metadata item is not found. var ErrNotFound = errors.New("not found") @@ -57,6 +57,7 @@ type Manager struct { mu sync.Mutex b contentManager + // +checklocks:mu pendingEntries map[ID]*manifestEntry committed *committedManifestManager diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 0b4bcf089..8c2030c47 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -32,9 +32,13 @@ var errSomeError = errors.Errorf("some error") type fakeContentManager struct { - mu sync.Mutex - data map[content.ID][]byte - compresionIDs map[content.ID]compression.HeaderID + mu sync.Mutex + + // +checklocks:mu + data map[content.ID][]byte + // +checklocks:mu + compresionIDs map[content.ID]compression.HeaderID + supportsContentCompression bool writeContentError error } diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index f68a6e40e..aafb5b62a 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -34,7 +34,8 @@ type Writer interface { } type contentIDTracker struct { - mu sync.Mutex + mu sync.Mutex + // +checklocks:mu contents map[content.ID]bool } diff --git a/repo/recently_read.go b/repo/recently_read.go index d458a139d..52b5f3e44 100644 --- a/repo/recently_read.go +++ b/repo/recently_read.go @@ -7,10 +7,13 @@ ) type recentlyRead struct { - mu sync.Mutex + mu sync.Mutex + // +checklocks:mu contentList []content.ID - next int - contentSet map[content.ID]struct{} + // +checklocks:mu + next int + // +checklocks:mu + contentSet map[content.ID]struct{} } func (r *recentlyRead) add(contentID content.ID) { diff --git a/snapshot/manifest.go b/snapshot/manifest.go index a8830ca15..2af71fe97 100644 --- a/snapshot/manifest.go +++ b/snapshot/manifest.go @@ -172,21 +172,27 @@ type StorageStats struct { // StorageUsageDetails provides details about snapshot storage usage. type StorageUsageDetails struct { // number of bytes in all objects (ignoring content-level deduplication). + // +checkatomic ObjectBytes int64 `json:"objectBytes"` // number of bytes in all unique contents (original). + // +checkatomic OriginalContentBytes int64 `json:"originalContentBytes"` // number of bytes in all unique contents as stored in the repository. + // +checkatomic PackedContentBytes int64 `json:"packedContentBytes"` // number of unique file objects. + // +checkatomic FileObjectCount int32 `json:"fileObjects"` // number of unique objects. + // +checkatomic DirObjectCount int32 `json:"dirObjects"` // number of unique contents. + // +checkatomic ContentCount int32 `json:"contents"` } diff --git a/snapshot/restore/restore.go b/snapshot/restore/restore.go index 363c437ff..664b4cda6 100644 --- a/snapshot/restore/restore.go +++ b/snapshot/restore/restore.go @@ -32,18 +32,29 @@ type Output interface { // Stats represents restore statistics. type Stats struct { + // +checkatomic RestoredTotalFileSize int64 + // +checkatomic EnqueuedTotalFileSize int64 - SkippedTotalFileSize int64 + // +checkatomic + SkippedTotalFileSize int64 - RestoredFileCount int32 - RestoredDirCount int32 + // +checkatomic + RestoredFileCount int32 + // +checkatomic + RestoredDirCount int32 + // +checkatomic RestoredSymlinkCount int32 - EnqueuedFileCount int32 - EnqueuedDirCount int32 + // +checkatomic + EnqueuedFileCount int32 + // +checkatomic + EnqueuedDirCount int32 + // +checkatomic EnqueuedSymlinkCount int32 - SkippedCount int32 - IgnoredErrorCount int32 + // +checkatomic + SkippedCount int32 + // +checkatomic + IgnoredErrorCount int32 } func (s *Stats) clone() Stats { diff --git a/snapshot/snapshotfs/checkpoint_registry.go b/snapshot/snapshotfs/checkpoint_registry.go index f66998117..d591cf4b4 100644 --- a/snapshot/snapshotfs/checkpoint_registry.go +++ b/snapshot/snapshotfs/checkpoint_registry.go @@ -17,6 +17,7 @@ type checkpointRegistry struct { mu sync.Mutex + // +checklocks:mu checkpoints map[string]checkpointFunc } diff --git a/snapshot/snapshotfs/estimate.go b/snapshot/snapshotfs/estimate.go index 2da218d16..4fbe92492 100644 --- a/snapshot/snapshotfs/estimate.go +++ b/snapshot/snapshotfs/estimate.go @@ -4,6 +4,7 @@ "context" "fmt" "path/filepath" + "sync/atomic" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/ignorefs" @@ -88,13 +89,13 @@ func Estimate(ctx context.Context, rep repo.Repository, entry fs.Directory, poli ed = append(ed, relativePath) } - stats.ExcludedDirCount++ + atomic.AddInt32(&stats.ExcludedDirCount, 1) estimateLog(ctx).Debugf("excluded dir %v", relativePath) } else { estimateLog(ctx).Debugf("excluded file %v (%v)", relativePath, units.BytesStringBase10(e.Size())) - stats.ExcludedFileCount++ - stats.ExcludedTotalFileSize += e.Size() + atomic.AddInt32(&stats.ExcludedFileCount, 1) + atomic.AddInt64(&stats.ExcludedTotalFileSize, e.Size()) eb.add(relativePath, e.Size(), maxExamplesPerBucket) } } @@ -116,7 +117,7 @@ func estimate(ctx context.Context, relativePath string, entry fs.Entry, policyTr switch entry := entry.(type) { case fs.Directory: - stats.TotalDirectoryCount++ + atomic.AddInt32(&stats.TotalDirectoryCount, 1) progress.Processing(ctx, relativePath) @@ -125,9 +126,9 @@ func estimate(ctx context.Context, relativePath string, entry fs.Entry, policyTr isIgnored := policyTree.EffectivePolicy().ErrorHandlingPolicy.IgnoreDirectoryErrors.OrDefault(false) if isIgnored { - stats.IgnoredErrorCount++ + atomic.AddInt32(&stats.IgnoredErrorCount, 1) } else { - stats.ErrorCount++ + atomic.AddInt32(&stats.ErrorCount, 1) } progress.Error(ctx, relativePath, err, isIgnored) @@ -143,8 +144,8 @@ func estimate(ctx context.Context, relativePath string, entry fs.Entry, policyTr case fs.File: ib.add(relativePath, entry.Size(), maxExamplesPerBucket) - stats.TotalFileCount++ - stats.TotalFileSize += entry.Size() + atomic.AddInt32(&stats.TotalFileCount, 1) + atomic.AddInt64(&stats.TotalFileSize, entry.Size()) } return nil diff --git a/snapshot/snapshotfs/snapshot_tree_walker.go b/snapshot/snapshotfs/snapshot_tree_walker.go index b8ce6fee1..a373caf93 100644 --- a/snapshot/snapshotfs/snapshot_tree_walker.go +++ b/snapshot/snapshotfs/snapshot_tree_walker.go @@ -26,9 +26,11 @@ type TreeWalker struct { enqueued sync.Map wp *workshare.Pool - mu sync.Mutex + mu sync.Mutex + // +checklocks:mu numErrors int - errors []error + // +checklocks:mu + errors []error } func oidOf(e fs.Entry) object.ID { diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 4bb013c4e..c8180a7bc 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -60,6 +60,7 @@ // Uploader supports efficient uploading files and directories to repository. type Uploader struct { // values aligned to 8-bytes due to atomic access + // +checkatomic totalWrittenBytes int64 Progress UploadProgress @@ -94,7 +95,9 @@ type Uploader struct { repo repo.RepositoryWriter // stats must be allocated on heap to enforce 64-bit alignment due to atomic access on ARM. - stats *snapshot.Stats + stats *snapshot.Stats + + // +checkatomic canceled int32 getTicker func(time.Duration) <-chan time.Time @@ -542,7 +545,9 @@ func rootCauseError(err error) error { type dirManifestBuilder struct { mu sync.Mutex + // +checklocks:mu summary fs.DirectorySummary + // +checklocks:mu entries []*snapshot.DirEntry } @@ -615,7 +620,9 @@ func (b *dirManifestBuilder) Build(dirModTime time.Time, incompleteReason string s := b.summary s.TotalDirCount++ - if len(b.entries) == 0 { + entries := b.entries + + if len(entries) == 0 { s.MaxModTime = dirModTime } @@ -625,18 +632,18 @@ func (b *dirManifestBuilder) Build(dirModTime time.Time, incompleteReason string // sort the result, directories first, then non-directories, ordered by name sort.Slice(b.entries, func(i, j int) bool { - if leftDir, rightDir := isDir(b.entries[i]), isDir(b.entries[j]); leftDir != rightDir { + if leftDir, rightDir := isDir(entries[i]), isDir(entries[j]); leftDir != rightDir { // directories get sorted before non-directories return leftDir } - return b.entries[i].Name < b.entries[j].Name + return entries[i].Name < entries[j].Name }) return &snapshot.DirManifest{ StreamType: directoryStreamType, Summary: &s, - Entries: b.entries, + Entries: entries, } } @@ -1280,7 +1287,7 @@ func (u *Uploader) Upload( defer u.workerPool.Close() u.stats = &snapshot.Stats{} - u.totalWrittenBytes = 0 + atomic.StoreInt64(&u.totalWrittenBytes, 0) var err error diff --git a/snapshot/snapshotfs/upload_progress.go b/snapshot/snapshotfs/upload_progress.go index d14093285..c07d4f70c 100644 --- a/snapshot/snapshotfs/upload_progress.go +++ b/snapshot/snapshotfs/upload_progress.go @@ -95,21 +95,32 @@ func (p *NullUploadProgress) Error(path string, err error, isIgnored bool) {} // UploadCounters represents a snapshot of upload counters. type UploadCounters struct { - TotalCachedBytes int64 `json:"cachedBytes"` - TotalHashedBytes int64 `json:"hashedBytes"` + // +checkatomic + TotalCachedBytes int64 `json:"cachedBytes"` + // +checkatomic + TotalHashedBytes int64 `json:"hashedBytes"` + // +checkatomic TotalUploadedBytes int64 `json:"uploadedBytes"` + // +checkatomic EstimatedBytes int64 `json:"estimatedBytes"` + // +checkatomic TotalCachedFiles int32 `json:"cachedFiles"` + // +checkatomic TotalHashedFiles int32 `json:"hashedFiles"` + // +checkatomic TotalExcludedFiles int32 `json:"excludedFiles"` - TotalExcludedDirs int32 `json:"excludedDirs"` + // +checkatomic + TotalExcludedDirs int32 `json:"excludedDirs"` - FatalErrorCount int32 `json:"errors"` + // +checkatomic + FatalErrorCount int32 `json:"errors"` + // +checkatomic IgnoredErrorCount int32 `json:"ignoredErrors"` - EstimatedFiles int32 `json:"estimatedFiles"` + // +checkatomic + EstimatedFiles int32 `json:"estimatedFiles"` CurrentDirectory string `json:"directory"` @@ -175,9 +186,9 @@ func (p *CountingUploadProgress) Error(path string, err error, isIgnored bool) { defer p.mu.Unlock() if isIgnored { - p.counters.IgnoredErrorCount++ + atomic.AddInt32(&p.counters.IgnoredErrorCount, 1) } else { - p.counters.FatalErrorCount++ + atomic.AddInt32(&p.counters.FatalErrorCount, 1) } p.counters.LastErrorPath = path diff --git a/snapshot/snapshotfs/upload_scan.go b/snapshot/snapshotfs/upload_scan.go index 4c6da5a1e..7ca309cd1 100644 --- a/snapshot/snapshotfs/upload_scan.go +++ b/snapshot/snapshotfs/upload_scan.go @@ -2,6 +2,7 @@ import ( "context" + "sync/atomic" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/snapshot" @@ -19,8 +20,8 @@ func (e *scanResults) Processing(ctx context.Context, pathname string) {} func (e *scanResults) Stats(ctx context.Context, s *snapshot.Stats, includedFiles, excludedFiles SampleBuckets, excludedDirs []string, final bool) { if final { - e.numFiles = int(s.TotalFileCount) - e.totalFileSize = s.TotalFileSize + e.numFiles = int(atomic.LoadInt32(&s.TotalFileCount)) + e.totalFileSize = atomic.LoadInt64(&s.TotalFileSize) } } diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index 073d11c20..6753383eb 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -8,6 +8,7 @@ "path/filepath" "reflect" "sort" + "sync/atomic" "testing" "time" @@ -147,16 +148,16 @@ func TestUpload(t *testing.T) { t.Errorf("expected s1.RootObjectID==s2.RootObjectID, got %v and %v", s1.RootObjectID().String(), s2.RootObjectID().String()) } - if got, want := s1.Stats.CachedFiles, int32(0); got != want { + if got, want := atomic.LoadInt32(&s1.Stats.CachedFiles), int32(0); got != want { t.Errorf("unexpected s1 cached files: %v, want %v", got, want) } // All non-cached files from s1 are now cached and there are no non-cached files since nothing changed. - if got, want := s2.Stats.CachedFiles, s1.Stats.NonCachedFiles; got != want { + if got, want := atomic.LoadInt32(&s2.Stats.CachedFiles), atomic.LoadInt32(&s1.Stats.NonCachedFiles); got != want { t.Errorf("unexpected s2 cached files: %v, want %v", got, want) } - if got, want := s2.Stats.NonCachedFiles, int32(0); got != want { + if got, want := atomic.LoadInt32(&s2.Stats.NonCachedFiles), int32(0); got != want { t.Errorf("unexpected non-cached files: %v", got) } @@ -172,7 +173,7 @@ func TestUpload(t *testing.T) { t.Errorf("expected s3.RootObjectID!=s2.RootObjectID, got %v", s3.RootObjectID().String()) } - if s3.Stats.NonCachedFiles != 1 { + if atomic.LoadInt32(&s3.Stats.NonCachedFiles) != 1 { // one file is not cached, which causes "./d2/d1/", "./d2/" and "./" to be changed. t.Errorf("unexpected s3 stats: %+v", s3.Stats) } @@ -190,7 +191,7 @@ func TestUpload(t *testing.T) { } // Everything is still cached. - if s4.Stats.CachedFiles != s1.Stats.NonCachedFiles || s4.Stats.NonCachedFiles != 0 { + if atomic.LoadInt32(&s4.Stats.CachedFiles) != atomic.LoadInt32(&s1.Stats.NonCachedFiles) || atomic.LoadInt32(&s4.Stats.NonCachedFiles) != 0 { t.Errorf("unexpected s4 stats: %+v", s4.Stats) } @@ -203,7 +204,7 @@ func TestUpload(t *testing.T) { t.Errorf("expected s4.RootObjectID==s5.RootObjectID, got %v and %v", s4.RootObjectID(), s5.RootObjectID()) } - if s5.Stats.NonCachedFiles != 0 { + if atomic.LoadInt32(&s5.Stats.NonCachedFiles) != 0 { // no files are changed, but one file disappeared which caused "./d2/d1/", "./d2/" and "./" to be changed. t.Errorf("unexpected s5 stats: %+v", s5.Stats) } @@ -694,21 +695,21 @@ func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) { t.Fatalf("Upload error: %v", err) } - if got, want := man.Stats.CachedFiles, int32(0); got != want { + if got, want := atomic.LoadInt32(&man.Stats.CachedFiles), int32(0); got != want { t.Fatalf("unexpected manifest cached files: %v, want %v", got, want) } - if got, want := man.Stats.NonCachedFiles, int32(1); got != want { + if got, want := atomic.LoadInt32(&man.Stats.NonCachedFiles), int32(1); got != want { // one file is not cached t.Fatalf("unexpected manifest non-cached files: %v, want %v", got, want) } - if got, want := man.Stats.TotalDirectoryCount, int32(1); got != want { + if got, want := atomic.LoadInt32(&man.Stats.TotalDirectoryCount), int32(1); got != want { // must have one directory t.Fatalf("unexpected manifest directory count: %v, want %v", got, want) } - if got, want := man.Stats.TotalFileCount, int32(1); got != want { + if got, want := atomic.LoadInt32(&man.Stats.TotalFileCount), int32(1); got != want { // must have one file t.Fatalf("unexpected manifest file count: %v, want %v", got, want) } diff --git a/tests/robustness/checker/checker.go b/tests/robustness/checker/checker.go index 011037c66..b04c8409e 100644 --- a/tests/robustness/checker/checker.go +++ b/tests/robustness/checker/checker.go @@ -38,7 +38,7 @@ type Checker struct { DeleteLimit int mu sync.RWMutex - SnapIDIndex snapmeta.Index + SnapIDIndex snapmeta.Index // +checklocksignore } // NewChecker instantiates a new Checker, returning its pointer. A temporary diff --git a/tests/robustness/multiclient_test/framework/filewriter.go b/tests/robustness/multiclient_test/framework/filewriter.go index dad591926..8ae554924 100644 --- a/tests/robustness/multiclient_test/framework/filewriter.go +++ b/tests/robustness/multiclient_test/framework/filewriter.go @@ -17,8 +17,10 @@ // delegate to a specific client FileWriter. type MultiClientFileWriter struct { // Map of client ID to FileWriter and associated mutex + mu sync.RWMutex + + // +checklocks:mu fileWriters map[string]FileWriter - mu sync.RWMutex // Function used to generate new FileWriters newFileWriter newFileWriterFn diff --git a/tests/robustness/pathlock/path_lock.go b/tests/robustness/pathlock/path_lock.go index 7bf61b1e5..0a0ad7677 100644 --- a/tests/robustness/pathlock/path_lock.go +++ b/tests/robustness/pathlock/path_lock.go @@ -36,7 +36,9 @@ type Unlocker interface { // that has already been Locked. The thread will be blocked until the holder // of the lock calls Unlock. type pathLock struct { - mu sync.Mutex + mu sync.Mutex + + // +checklocks:mu lockedPaths map[string]chan struct{} } diff --git a/tests/tools/kopiaclient/kopiaclient.go b/tests/tools/kopiaclient/kopiaclient.go index e573f9384..dcfb835c3 100644 --- a/tests/tools/kopiaclient/kopiaclient.go +++ b/tests/tools/kopiaclient/kopiaclient.go @@ -11,6 +11,7 @@ "log" "os" "path/filepath" + "sync/atomic" "github.com/pkg/errors" @@ -99,7 +100,7 @@ func (kc *KopiaClient) SnapshotCreate(ctx context.Context, key string, val []byt return errors.Wrap(err, "cannot get manifest") } - log.Printf("snapshotting %v", units.BytesStringBase10(man.Stats.TotalFileSize)) + log.Printf("snapshotting %v", units.BytesStringBase10(atomic.LoadInt64(&man.Stats.TotalFileSize))) if _, err := snapshot.SaveSnapshot(ctx, rw, man); err != nil { return errors.Wrap(err, "cannot save snapshot") diff --git a/tools/tools.mk b/tools/tools.mk index f04ee69de..9f5f35d69 100644 --- a/tools/tools.mk +++ b/tools/tools.mk @@ -103,6 +103,7 @@ endif # tool versions GOLANGCI_LINT_VERSION=1.45.0 +CHECKLOCKS_VERSION=release-20220314.0 NODE_VERSION=16.13.0 HUGO_VERSION=0.89.2 GOTESTSUM_VERSION=1.7.0 @@ -149,6 +150,14 @@ endif $(linter): go run github.com/kopia/kopia/tools/gettool --tool linter:$(GOLANGCI_LINT_VERSION) --output-dir $(linter_dir) +# checklocks +checklocks_dir=$(TOOLS_DIR)$(slash)checklocks-$(CHECKLOCKS_VERSION) +checklocks=$(checklocks_dir)$(slash)bin$(slash)checklocks$(exe_suffix) + +$(checklocks): export GOPATH=$(checklocks_dir) +$(checklocks): + go install gvisor.dev/gvisor/tools/checklocks/cmd/checklocks@$(CHECKLOCKS_VERSION) + # hugo hugo_dir=$(TOOLS_DIR)$(slash)hugo-$(HUGO_VERSION) hugo=$(hugo_dir)/hugo$(exe_suffix)