chore(ci): added checklocks static analyzer (#1838)

From https://github.com/google/gvisor/tree/master/tools/checklocks

This will perform static verification that we're using
`sync.Mutex`, `sync.RWMutex` and `atomic` correctly to guard access
to certain fields.

This was mostly just a matter of adding annotations to indicate which
fields are guarded by which mutex.

In a handful of places the code had to be refactored to allow static
analyzer to do its job better or to not be confused by some
constructs.

In one place this actually uncovered a bug where a function was not
releasing a lock properly in an error case.

The check is part of `make lint` but can also be invoked by
`make check-locks`.
This commit is contained in:
Jarek Kowalski
2022-03-19 22:42:59 -07:00
committed by GitHub
parent 9cba4a97be
commit daa62de3e4
63 changed files with 572 additions and 271 deletions

View File

@@ -64,7 +64,14 @@ install-noui: install
install-race:
go install -race $(KOPIA_BUILD_FLAGS) -tags "$(KOPIA_BUILD_TAGS)"
lint: $(linter)
check-locks: $(checklocks)
ifneq ($(GOOS)/$(GOARCH),linux/arm64)
ifneq ($(GOOS)/$(GOARCH),linux/arm)
go vet -vettool=$(checklocks) ./...
endif
endif
lint: $(linter) check-locks
ifneq ($(GOOS)/$(GOARCH),linux/arm64)
ifneq ($(GOOS)/$(GOARCH),linux/arm)
$(linter) --deadline $(LINTER_DEADLINE) run $(linter_flags)
@@ -171,7 +178,7 @@ download-rclone:
go run ./tools/gettool --tool rclone:$(RCLONE_VERSION) --output-dir dist/kopia_linux_arm_6/ --goos=linux --goarch=arm
ci-tests: lint vet test
ci-tests: lint vet test
ci-integration-tests:
$(MAKE) integration-tests

View File

@@ -35,33 +35,47 @@ type cliProgress struct {
snapshotfs.NullUploadProgress
// all int64 must precede all int32 due to alignment requirements on ARM
uploadedBytes int64
cachedBytes int64
// +checkatomic
uploadedBytes int64
// +checkatomic
cachedBytes int64
// +checkatomic
hashedBytes int64
outputThrottle timetrack.Throttle // is int64
cachedFiles int32
// +checkatomic
cachedFiles int32
// +checkatomic
inProgressHashing int32
hashedFiles int32
uploadedFiles int32
// +checkatomic
hashedFiles int32
// +checkatomic
uploadedFiles int32
// +checkatomic
ignoredErrorCount int32
fatalErrorCount int32
// +checkatomic
fatalErrorCount int32
uploading int32
// +checkatomic
uploading int32
// +checkatomic
uploadFinished int32
lastLineLength int
spinPhase int
uploadStartTime timetrack.Estimator
outputMutex sync.Mutex
estimatedFileCount int
estimatedTotalBytes int64
// +checklocks:outputMutex
lastLineLength int
// +checklocks:outputMutex
spinPhase int
uploadStartTime timetrack.Estimator // +checklocksignore
estimatedFileCount int // +checklocksignore
estimatedTotalBytes int64 // +checklocksignore
// indicates shared instance that does not reset counters at the beginning of upload.
shared bool
outputMutex sync.Mutex
progressFlags
}
@@ -181,6 +195,7 @@ func (p *cliProgress) output(col *color.Color, msg string) {
p.out.printStderr("\r%v%v", line, extraSpaces)
}
// +checklocks:p.outputMutex
func (p *cliProgress) spinnerCharacter() string {
if atomic.LoadInt32(&p.uploadFinished) == 1 {
return "*"
@@ -193,6 +208,7 @@ func (p *cliProgress) spinnerCharacter() string {
return s
}
// +checklocksignore.
func (p *cliProgress) StartShared() {
*p = cliProgress{
uploading: 1,
@@ -207,6 +223,7 @@ func (p *cliProgress) FinishShared() {
p.output(defaultColor, "")
}
// +checklocksignore.
func (p *cliProgress) UploadStarted() {
if p.shared {
// do nothing

View File

@@ -27,7 +27,9 @@ func (c *commandContentStats) setup(svc appServices, parent commandParent) {
}
type contentStatsTotals struct {
originalSize, packedSize, count int64
originalSize int64
packedSize int64
count int64
}
func (c *commandContentStats) run(ctx context.Context, rep repo.DirectRepository) error {

View File

@@ -5,6 +5,7 @@
"fmt"
"path/filepath"
"strings"
"sync/atomic"
"time"
"github.com/fatih/color"
@@ -276,7 +277,7 @@ func (c *commandSnapshotList) outputManifestFromSingleSource(ctx context.Context
})
if m.IncompleteReason == "" {
lastTotalFileSize = m.Stats.TotalFileSize
lastTotalFileSize = atomic.LoadInt64(&m.Stats.TotalFileSize)
}
return nil
@@ -390,10 +391,10 @@ func (c *commandSnapshotList) entryBits(ctx context.Context, m *snapshot.Manifes
if u := m.StorageStats; u != nil {
bits = append(bits,
fmt.Sprintf("new-data:%v", units.BytesStringBase10(u.NewData.PackedContentBytes)),
fmt.Sprintf("new-files:%v", int64(u.NewData.FileObjectCount)),
fmt.Sprintf("new-dirs:%v", int64(u.NewData.DirObjectCount)),
fmt.Sprintf("compression:%v", formatCompressionPercentage(u.NewData.OriginalContentBytes, u.NewData.PackedContentBytes)),
fmt.Sprintf("new-data:%v", units.BytesStringBase10(atomic.LoadInt64(&u.NewData.PackedContentBytes))),
fmt.Sprintf("new-files:%v", atomic.LoadInt32(&u.NewData.FileObjectCount)),
fmt.Sprintf("new-dirs:%v", atomic.LoadInt32(&u.NewData.DirObjectCount)),
fmt.Sprintf("compression:%v", formatCompressionPercentage(atomic.LoadInt64(&u.NewData.OriginalContentBytes), atomic.LoadInt64(&u.NewData.PackedContentBytes))),
)
}

View File

@@ -58,8 +58,10 @@ func (c *commandSnapshotVerify) setup(svc appServices, parent commandParent) {
}
type verifier struct {
throttle timetrack.Throttle
queued int32
throttle timetrack.Throttle
// +checkatomic
queued int32
// +checkatomic
processed int32
fileWorkItems chan verifyFileWorkItem
@@ -80,7 +82,9 @@ func (v *verifier) showStats(ctx context.Context) {
func (v *verifier) verifyFile(ctx context.Context, oid object.ID, entryPath string) error {
log(ctx).Debugf("verifying object %v", oid)
defer atomic.AddInt32(&v.processed, 1)
defer func() {
atomic.AddInt32(&v.processed, 1)
}()
contentIDs, err := v.rep.VerifyObject(ctx, oid)
if err != nil {

View File

@@ -141,15 +141,18 @@ func (cv *cacheVerifier) reset() {
}
type lockState struct {
l sync.Locker
l sync.Locker
// +checkatomic
locked int32
}
// +checklocksacquire:ls.l
func (ls *lockState) Lock() {
ls.l.Lock()
atomic.AddInt32(&ls.locked, 1)
}
// +checklocksrelease:ls.l
func (ls *lockState) Unlock() {
atomic.AddInt32(&ls.locked, -1)
ls.l.Unlock()

View File

@@ -13,11 +13,14 @@
const defaultProfileRefreshFrequency = 10 * time.Second
type repositoryUserAuthenticator struct {
mu sync.Mutex
// +checklocks:mu
lastRep repo.Repository
mu sync.Mutex
nextRefreshTime time.Time
userProfiles map[string]*user.Profile
// +checklocks:mu
nextRefreshTime time.Time
// +checklocks:mu
userProfiles map[string]*user.Profile
// +checklocks:mu
userProfileRefreshFrequency time.Duration
}

View File

@@ -86,12 +86,15 @@
}
type aclCache struct {
aclRefreshFrequency time.Duration
aclRefreshFrequency time.Duration // +checklocksignore
mu sync.Mutex
lastRep repo.Repository
mu sync.Mutex
// +checklocks:mu
lastRep repo.Repository
// +checklocks:mu
nextRefreshTime time.Time
aclEntries []*acl.Entry
// +checklocks:mu
aclEntries []*acl.Entry
}
// Authorize returns authorization info based on ACLs stored in the repository falling back to legacy authorizer

View File

@@ -88,6 +88,7 @@ type eventuallyConsistentStorage struct {
recentlyDeleted sync.Map
listSettleTime time.Duration
// +checklocks:mu
caches []*ecFrontendCache
realStorage blob.Storage
timeNow func() time.Time

View File

@@ -18,8 +18,11 @@
type DataMap map[blob.ID][]byte
type mapStorage struct {
data DataMap
// +checklocks:mutex
data DataMap
// +checklocks:mutex
keyTime map[blob.ID]time.Time
// +checklocks:mutex
timeNow func() time.Time
mutex sync.RWMutex
}

View File

@@ -29,11 +29,13 @@ type entry struct {
// marker. This struct manages the retention time of each blob throug hte
// PutBlob options.
type objectLockingMap struct {
// +checklocks:mutex
data versionedEntries
timeNow func() time.Time
timeNow func() time.Time // +checklocksignore
mutex sync.RWMutex
}
// +checklocksread:s.mutex
func (s *objectLockingMap) getLatestByID(id blob.ID) (*entry, error) {
versions, ok := s.data[id]
if !ok {
@@ -50,6 +52,7 @@ func (s *objectLockingMap) getLatestByID(id blob.ID) (*entry, error) {
return e, nil
}
// +checklocksread:s.mutex
func (s *objectLockingMap) getLatestForMutationLocked(id blob.ID) (*entry, error) {
e, err := s.getLatestByID(id)
if err != nil {

View File

@@ -279,8 +279,10 @@ func testGetContentRaceFetchesOnce(t *testing.T, newCache newContentCacheFunc) {
type concurrencyTester struct {
mu sync.Mutex
concurrencyLevel int
maxConcurrencyLevel int
// +checklocks:mu
concurrencyLevel int
maxConcurrencyLevel int // +checklocksignore
}
func (c *concurrencyTester) enter() {

View File

@@ -37,6 +37,7 @@
// PersistentCache provides persistent on-disk cache.
type PersistentCache struct {
// +checkatomic
anyChange int32
cacheStorage Storage

View File

@@ -31,7 +31,8 @@ type ConnectorImpl interface {
type Reconnector struct {
connector ConnectorImpl
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
activeConnection Connection
}

View File

@@ -21,7 +21,9 @@
)
type fakeConnector struct {
nextConnectionID int32
// +checkatomic
nextConnectionID int32
maxConnections int
connectionConcurrency int
nextError error

View File

@@ -27,6 +27,7 @@ func AutoAdvance(t time.Time, dt time.Duration) func() time.Time {
// TimeAdvance allows controlling the passage of time. Intended to be used in
// tests.
type TimeAdvance struct {
// +checkatomic
delta int64
autoDt int64
base time.Time
@@ -60,7 +61,8 @@ func (t *TimeAdvance) Advance(dt time.Duration) time.Time {
// ClockTimeWithOffset allows controlling the passage of time. Intended to be used in
// tests.
type ClockTimeWithOffset struct {
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
offset time.Duration
}

View File

@@ -66,9 +66,13 @@ func (f *fuseFileNode) Open(ctx context.Context, flags uint32) (gofusefs.FileHan
}
type fuseFileHandle struct {
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
reader fs.Reader
file fs.File
// +checklocks:mu
file fs.File
}
func (f *fuseFileHandle) Read(ctx context.Context, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) {
@@ -93,6 +97,9 @@ func (f *fuseFileHandle) Read(ctx context.Context, dest []byte, off int64) (fuse
}
func (f *fuseFileHandle) Release(ctx context.Context) syscall.Errno {
f.mu.Lock()
defer f.mu.Unlock()
f.reader.Close() //nolint:errcheck
return gofusefs.OK

View File

@@ -7,7 +7,7 @@
"github.com/kopia/kopia/repo/logging"
)
var log = logging.Module("gather")
var log = logging.Module("gather") // +checklocksignore
// WriteBuffer is a write buffer for content of unknown size that manages
// data in a series of byte slices of uniform size.

View File

@@ -45,17 +45,28 @@ type chunkAllocator struct {
name string
chunkSize int
mu sync.Mutex
freeList [][]byte
maxFreeListSize int
mu sync.Mutex
// +checklocks:mu
freeList [][]byte
// +checklocks:mu
maxFreeListSize int
// +checklocks:mu
freeListHighWaterMark int
allocHighWaterMark int
allocated int
slicesAllocated int
freed int
activeChunks map[uintptr]string
// +checklocks:mu
allocHighWaterMark int
// +checklocks:mu
allocated int
// +checklocks:mu
slicesAllocated int
// +checklocks:mu
freed int
// +checklocks:mu
activeChunks map[uintptr]string
}
// +checklocks:a.mu
func (a *chunkAllocator) trackAlloc(v []byte) []byte {
if trackChunkAllocations {
var (

View File

@@ -9,8 +9,10 @@
const bufSize = 65536
var (
mu sync.Mutex //nolint:gochecknoglobals
buffers [][]byte //nolint:gochecknoglobals
mu sync.Mutex //nolint:gochecknoglobals
// +checklocks:mu
buffers [][]byte //nolint:gochecknoglobals
)
// GetBuffer allocates new temporary buffer suitable for copying data.

View File

@@ -373,14 +373,26 @@ func logLevelFromFlag(levelString string) zapcore.LevelEnabler {
}
type onDemandFile struct {
segmentCounter int // number of segments written
currentSegmentSize int // number of bytes written to current segment
maxSegmentSize int
// +checklocks:mu
segmentCounter int // number of segments written
// +checklocks:mu
currentSegmentSize int // number of bytes written to current segment
// +checklocks:mu
maxSegmentSize int
// +checklocks:mu
currentSegmentFilename string
logDir string
// +checklocks:mu
logDir string
// +checklocks:mu
logFileBaseName string
symlinkName string
// +checklocks:mu
symlinkName string
startSweep func()

View File

@@ -10,14 +10,27 @@
"github.com/kopia/kopia/repo/logging"
)
var log = logging.Module("memtrack")
var log = logging.Module("memtrack") // +checklocksignore
type tracker struct {
name string
memoryTrackerMutex sync.Mutex
initialMemStats runtime.MemStats
previousMemStats runtime.MemStats
maxAlloc, maxHeapUsage, maxStackInUse uint64
name string
memoryTrackerMutex sync.Mutex
// +checklocks:memoryTrackerMutex
initialMemStats runtime.MemStats
// +checklocks:memoryTrackerMutex
previousMemStats runtime.MemStats
// +checklocks:memoryTrackerMutex
maxAlloc uint64
// +checklocks:memoryTrackerMutex
maxHeapUsage uint64
// +checklocks:memoryTrackerMutex
maxStackInUse uint64
}
func (c *tracker) dump(ctx context.Context, desc string) {

View File

@@ -39,7 +39,9 @@ type CacheStorage struct {
prefixes []blob.ID
cacheDuration time.Duration
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
nextSweepTime time.Time
}

View File

@@ -221,9 +221,12 @@ type concurrencyTest struct {
prefix blob.ID
deadline time.Time
mu sync.Mutex
blobData map[blob.ID][]byte
blobIDs []blob.ID
mu sync.Mutex
// +checklocks:mu
blobData map[blob.ID][]byte
// +checklocks:mu
blobIDs []blob.ID
// +checklocks:mu
blobWritten map[blob.ID]bool
}

View File

@@ -9,7 +9,9 @@
// CountSum holds sum and count values.
type CountSum struct {
sum int64
// +checkatomic
sum int64
// +checkatomic
count uint32
}

View File

@@ -11,8 +11,10 @@
// CountSum holds sum and count values.
type CountSum struct {
mu sync.Mutex
sum int64
mu sync.Mutex
// +checklocks:mu
sum int64
// +checklocks:mu
count uint32
}

View File

@@ -73,9 +73,11 @@ type Info struct {
type runningTaskInfo struct {
Info
mu sync.Mutex
maxLogMessages int
taskCancel []context.CancelFunc
maxLogMessages int // +checklocksignore
mu sync.Mutex
// +checklocks:mu
taskCancel []context.CancelFunc
}
// CurrentTaskID implements the Controller interface.

View File

@@ -21,13 +21,16 @@
// Manager manages UI tasks.
type Manager struct {
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
nextTaskID int
running map[string]*runningTaskInfo
finished map[string]*Info
// +checklocks:mu
running map[string]*runningTaskInfo
// +checklocks:mu
finished map[string]*Info
MaxFinishedTasks int
MaxLogMessagesPerTask int
MaxFinishedTasks int // +checklocksignore
MaxLogMessagesPerTask int // +checklocksignore
}
// Controller allows the task to communicate with task manager and receive signals.

View File

@@ -25,12 +25,15 @@
type webdavFile struct {
// webdavFile implements webdav.File but needs context
ctx context.Context //nolint:containedctx
// +checklocks:mu
ctx context.Context // nolint:containedctx
entry fs.File
mu sync.Mutex
r fs.Reader
// +checklocks:mu
r fs.Reader
}
func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) {

View File

@@ -19,6 +19,7 @@ type workItem struct {
// Pool manages a pool of generic workers that can process workItem.
type Pool struct {
// +checkatomic
activeWorkers int32
semaphore chan struct{}

View File

@@ -13,21 +13,36 @@
var errNonRetriable = errors.Errorf("some non-retriable error")
type mockOS struct {
readFileRemainingErrors int32
writeFileRemainingErrors int32
writeFileCloseRemainingErrors int32
createNewFileRemainingErrors int32
mkdirAllRemainingErrors int32
renameRemainingErrors int32
removeRemainingRetriableErrors int32
removeRemainingNonRetriableErrors int32
chownRemainingErrors int32
readDirRemainingErrors int32
readDirRemainingNonRetriableErrors int32
// +checkatomic
readFileRemainingErrors int32
// +checkatomic
writeFileRemainingErrors int32
// +checkatomic
writeFileCloseRemainingErrors int32
// +checkatomic
createNewFileRemainingErrors int32
// +checkatomic
mkdirAllRemainingErrors int32
// +checkatomic
renameRemainingErrors int32
// +checkatomic
removeRemainingRetriableErrors int32
// +checkatomic
removeRemainingNonRetriableErrors int32
// +checkatomic
chownRemainingErrors int32
// +checkatomic
readDirRemainingErrors int32
// +checkatomic
readDirRemainingNonRetriableErrors int32
// +checkatomic
readDirRemainingFileDeletedDirEntry int32
readDirRemainingFatalDirEntry int32
statRemainingErrors int32
chtimesRemainingErrors int32
// +checkatomic
readDirRemainingFatalDirEntry int32
// +checkatomic
statRemainingErrors int32
// +checkatomic
chtimesRemainingErrors int32
effectiveUID int

View File

@@ -14,14 +14,17 @@
type fileIDCache struct {
// Map of blobID -> *cacheEntry.
Blobs sync.Map
// Guards access to ChangeLog.
mu sync.RWMutex
// Record of recent cache changes.
// It's stored as a synchronized circular buffer.
ChangeLog [changeLogCacheSize]changeEntry
// ChangeLogIdx indicates the next location to write to.
// The log entry range is [ChangeLogIdx+1, ChangeLogIdx-1].
ChangeLogIdx int
// Guards access to ChangeLog.
ChangeLogMut sync.RWMutex
// +checklocks:mu
changeLog [changeLogCacheSize]changeEntry
// changeLogIdx indicates the next location to write to.
// The log entry range is [changeLogIdx+1, changeLogIdx-1].
// +checklocks:mu
changeLogIdx int
}
// cacheEntry is a blob cache entry.
@@ -79,36 +82,39 @@ func (cache *fileIDCache) BlindPut(blobID blob.ID, fileID string) {
// RecordBlobChange records a newly created or deleted blob.
// An empty fileID signals that the blob is deleted.
func (cache *fileIDCache) RecordBlobChange(blobID blob.ID, fileID string) {
cache.ChangeLogMut.Lock()
cache.mu.Lock()
i := cache.ChangeLogIdx
cache.ChangeLog[i] = changeEntry{
i := cache.changeLogIdx
cache.changeLog[i] = changeEntry{
BlobID: blobID,
FileID: fileID,
}
cache.ChangeLogIdx = circularBufferNext(i)
cache.changeLogIdx = circularBufferNext(i)
cache.ChangeLogMut.Unlock()
cache.mu.Unlock()
}
// VisitBlobChanges iterates through newly created or deleted blobs.
func (cache *fileIDCache) VisitBlobChanges(callback func(blobID blob.ID, fileID string)) {
cache.ChangeLogMut.RLock()
cache.mu.RLock()
for i := circularBufferNext(cache.ChangeLogIdx); i != cache.ChangeLogIdx; i = circularBufferNext(i) {
entry := cache.ChangeLog[i]
for i := circularBufferNext(cache.changeLogIdx); i != cache.changeLogIdx; i = circularBufferNext(i) {
entry := cache.changeLog[i]
if entry.BlobID != "" {
callback(entry.BlobID, entry.FileID)
}
}
cache.ChangeLogMut.RUnlock()
cache.mu.RUnlock()
}
// Clear resets the file ID cache.
func (cache *fileIDCache) Clear() {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.Blobs = sync.Map{}
cache.ChangeLog = [changeLogCacheSize]changeEntry{}
cache.changeLog = [changeLogCacheSize]changeEntry{}
}
func circularBufferNext(curr int) int {
@@ -123,8 +129,8 @@ func circularBufferNext(curr int) int {
func newFileIDCache() *fileIDCache {
return &fileIDCache{
Blobs: sync.Map{},
ChangeLog: [changeLogCacheSize]changeEntry{},
ChangeLogIdx: 0,
ChangeLogMut: sync.RWMutex{},
changeLog: [changeLogCacheSize]changeEntry{},
changeLogIdx: 0,
mu: sync.RWMutex{},
}
}

View File

@@ -11,7 +11,9 @@
)
type loggingStorage struct {
concurrency int32
// +checkatomic
concurrency int32
// +checkatomic
maxConcurrency int32
base blob.Storage

View File

@@ -736,6 +736,7 @@ func createMinioSessionToken(t *testing.T, minioEndpoint, kopiaUserName, kopiaUs
// provider to force expiration of the credentials. This causes
// the next call to Retrieve to return expired credentials.
type customProvider struct {
// +checkatomic
forceExpired atomic.Value
stsProvider miniocreds.STSAssumeRole
}

View File

@@ -20,7 +20,7 @@
// CompleteBlobSuffix is the extension for sharded blobs that have completed writing.
const CompleteBlobSuffix = ".f"
var log = logging.Module("sharded")
var log = logging.Module("sharded") // +checklocksignore
// Impl must be implemented by underlying provided.
type Impl interface {
@@ -39,7 +39,9 @@ type Storage struct {
Options
parametersMutex sync.Mutex
parameters *Parameters
// +checklocks:parametersMutex
parameters *Parameters
}
// GetBlob implements blob.Storage.

View File

@@ -17,7 +17,8 @@ type SettableThrottler interface {
}
type tokenBucketBasedThrottler struct {
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
limits Limits
readOps *tokenBucket
@@ -25,7 +26,7 @@ type tokenBucketBasedThrottler struct {
listOps *tokenBucket
upload *tokenBucket
download *tokenBucket
window time.Duration
window time.Duration // +checklocksignore
}
func (t *tokenBucketBasedThrottler) BeforeOperation(ctx context.Context, op string) {

View File

@@ -24,13 +24,17 @@
const smallIndexEntryCountThreshold = 100
type committedContentIndex struct {
// +checkatomic
rev int64
cache committedContentIndexCache
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
deletionWatermark time.Time
inUse map[blob.ID]packIndex
merged mergedIndex
// +checklocks:mu
inUse map[blob.ID]packIndex
// +checklocks:mu
merged mergedIndex
v1PerContentOverhead uint32
indexVersion int
@@ -58,7 +62,7 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) {
info, err := c.merged.GetInfo(contentID)
if info != nil {
if c.shouldIgnore(info) {
if shouldIgnore(info, c.deletionWatermark) {
return nil, ErrContentNotFound
}
@@ -72,12 +76,12 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) {
return nil, err
}
func (c *committedContentIndex) shouldIgnore(id Info) bool {
func shouldIgnore(id Info, deletionWatermark time.Time) bool {
if !id.GetDeleted() {
return false
}
return !id.Timestamp().After(c.deletionWatermark)
return !id.Timestamp().After(deletionWatermark)
}
func (c *committedContentIndex) addIndexBlob(ctx context.Context, indexBlobID blob.ID, data gather.Bytes, use bool) error {
@@ -85,7 +89,9 @@ func (c *committedContentIndex) addIndexBlob(ctx context.Context, indexBlobID bl
// doing it prematurely might confuse callers of revision() who may cache
// a set of old contents and associate it with new revision, before new contents
// are actually available.
defer atomic.AddInt64(&c.rev, 1)
defer func() {
atomic.AddInt64(&c.rev, 1)
}()
if err := c.cache.addContentToCache(ctx, indexBlobID, data); err != nil {
return errors.Wrap(err, "error adding content to cache")
@@ -118,10 +124,11 @@ func (c *committedContentIndex) addIndexBlob(ctx context.Context, indexBlobID bl
func (c *committedContentIndex) listContents(r IDRange, cb func(i Info) error) error {
c.mu.Lock()
m := append(mergedIndex(nil), c.merged...)
deletionWatermark := c.deletionWatermark
c.mu.Unlock()
return m.Iterate(r, func(i Info) error {
if c.shouldIgnore(i) {
if shouldIgnore(i, deletionWatermark) {
return nil
}
@@ -129,6 +136,7 @@ func (c *committedContentIndex) listContents(r IDRange, cb func(i Info) error) e
})
}
// +checklocks:c.mu
func (c *committedContentIndex) indexFilesChanged(indexFiles []blob.ID) bool {
if len(indexFiles) != len(c.inUse) {
return true

View File

@@ -12,9 +12,12 @@
)
type memoryCommittedContentIndexCache struct {
mu sync.Mutex
contents map[blob.ID]packIndex
v1PerContentOverhead uint32
mu sync.Mutex
// +checklocks:mu
contents map[blob.ID]packIndex
v1PerContentOverhead uint32 // +checklocksignore
}
func (m *memoryCommittedContentIndexCache) hasIndexBlobID(ctx context.Context, indexBlobID blob.ID) (bool, error) {

View File

@@ -71,8 +71,10 @@ type indexBlobManager interface {
// SharedManager is responsible for read-only access to committed data.
type SharedManager struct {
// +checkatomic
refCount int32 // number of Manager objects that refer to this SharedManager
closed int32 // set to 1 if shared manager has been closed
// +checkatomic
closed int32 // set to 1 if shared manager has been closed
Stats *Stats
st blob.Storage
@@ -94,6 +96,7 @@ type SharedManager struct {
indexesLock sync.RWMutex
// maybeRefreshIndexes() will call Refresh() after this point in ime.
// +checklocks:indexesLock
refreshIndexesAfter time.Time
format FormattingOptions
@@ -182,6 +185,7 @@ func (sm *SharedManager) attemptReadPackFileLocalIndex(ctx context.Context, pack
"unable to decrypt local index")
}
// +checklocks:sm.indexesLock
func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error {
nextSleepTime := 100 * time.Millisecond //nolint:gomnd
@@ -607,6 +611,9 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions
return nil, errors.Wrap(err, "error setting up read manager caches")
}
sm.indexesLock.Lock()
defer sm.indexesLock.Unlock()
if err := sm.loadPackIndexesLocked(ctx); err != nil {
return nil, errors.Wrap(err, "error loading indexes")
}

View File

@@ -29,20 +29,26 @@ func (bm *WriteManager) RecoverIndexFromPackBlob(ctx context.Context, packFile b
var recovered []Info
bm.lock()
defer bm.unlock()
err = ndx.Iterate(AllIDs, func(i Info) error {
recovered = append(recovered, i)
if commit {
// 'i' is ephemeral and will depend on temporary buffers which
// won't be available when this function returns, we need to
// convert it to durable struct.
bm.packIndexBuilder.Add(ToInfoStruct(i))
}
// 'i' is ephemeral and will depend on temporary buffers which
// won't be available when this function returns, we need to
// convert it to durable struct.
is := ToInfoStruct(i)
recovered = append(recovered, is)
return nil
})
if commit {
bm.lock()
defer bm.unlock()
for _, is := range recovered {
bm.packIndexBuilder.Add(is)
}
}
return recovered, errors.Wrap(err, "error iterating index entries")
}

View File

@@ -79,11 +79,15 @@ type IndexBlobInfo struct {
// WriteManager builds content-addressable storage with encryption, deduplication and packaging on top of BLOB store.
type WriteManager struct {
revision int64 // changes on each local write
// +checkatomic
revision int64 // changes on each local write
// +checkatomic
disableIndexRefresh int32
mu *sync.RWMutex
cond *sync.Cond
mu sync.RWMutex
// +checklocks:mu
cond *sync.Cond
// +checklocks:mu
flushing bool
sessionUser string // user@host to report as session owners
@@ -92,13 +96,19 @@ type WriteManager struct {
currentSessionInfo SessionInfo
sessionMarkerBlobIDs []blob.ID // session marker blobs written so far
pendingPacks map[blob.ID]*pendingPackInfo
writingPacks []*pendingPackInfo // list of packs that are being written
failedPacks []*pendingPackInfo // list of packs that failed to write, will be retried
packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed)
// +checklocks:mu
pendingPacks map[blob.ID]*pendingPackInfo
// +checklocks:mu
writingPacks []*pendingPackInfo // list of packs that are being written
// +checklocks:mu
failedPacks []*pendingPackInfo // list of packs that failed to write, will be retried
// +checklocks:mu
packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed)
// +checklocks:mu
disableIndexFlushCount int
flushPackIndexesAfter time.Time // time when those indexes should be flushed
// +checklocks:mu
flushPackIndexesAfter time.Time // time when those indexes should be flushed
onUpload func(int64)
@@ -177,6 +187,7 @@ func (bm *WriteManager) maybeRefreshIndexes(ctx context.Context) error {
}
// Intentionally passing bi by value.
// +checklocks:bm.mu
func (bm *WriteManager) deletePreexistingContent(ctx context.Context, ci Info) error {
if ci.GetDeleted() {
return nil
@@ -223,7 +234,7 @@ func (bm *WriteManager) maybeRetryWritingFailedPacksUnlocked(ctx context.Context
for _, pp := range fp {
bm.log.Debugf("retry-write %v", pp.packBlobID)
if err := bm.writePackAndAddToIndexLocked(ctx, pp, true); err != nil {
if err := bm.writePackAndAddToIndexLocked(ctx, pp); err != nil {
return errors.Wrap(err, "error writing previously failed pack")
}
}
@@ -275,7 +286,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
for _, pp := range fp {
bm.log.Debugf("retry-write %v", pp.packBlobID)
if err = bm.writePackAndAddToIndexLocked(ctx, pp, true); err != nil {
if err = bm.writePackAndAddToIndexLocked(ctx, pp); err != nil {
bm.unlock()
return errors.Wrap(err, "error writing previously failed pack")
}
@@ -298,6 +309,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
}
if _, err := compressedAndEncrypted.Bytes().WriteTo(pp.currentPackData); err != nil {
bm.unlock()
return errors.Wrapf(err, "unable to append %q to pack data", contentID)
}
@@ -319,7 +331,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
// at this point we're unlocked so different goroutines can encrypt and
// save to storage in parallel.
if shouldWrite {
if err := bm.writePackAndAddToIndexLocked(ctx, pp, false); err != nil {
if err := bm.acquireLockAndWritePackAndAddToIndex(ctx, pp); err != nil {
return errors.Wrap(err, "unable to write pack")
}
}
@@ -344,11 +356,13 @@ func (bm *WriteManager) EnableIndexFlush(ctx context.Context) {
bm.disableIndexFlushCount--
}
// +checklocks:bm.mu
func (bm *WriteManager) verifyInvariantsLocked() {
bm.verifyCurrentPackItemsLocked()
bm.verifyPackIndexBuilderLocked()
}
// +checklocks:bm.mu
func (bm *WriteManager) verifyCurrentPackItemsLocked() {
for _, pp := range bm.pendingPacks {
for k, cpi := range pp.currentPackItems {
@@ -363,6 +377,7 @@ func (bm *WriteManager) verifyCurrentPackItemsLocked() {
}
}
// +checklocks:bm.mu
func (bm *WriteManager) verifyPackIndexBuilderLocked() {
for k, cpi := range bm.packIndexBuilder {
bm.assertInvariant(cpi.GetContentID() == k, "content ID entry has invalid key: %v %v", cpi.GetContentID(), k)
@@ -390,6 +405,18 @@ func (bm *WriteManager) assertInvariant(ok bool, errorMsg string, arg ...interfa
panic(errorMsg)
}
// +checklocksread:bm.indexesLock
func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather.Bytes, sessionID SessionID) ([]blob.Metadata, error) {
// nolint:wrapcheck
return bm.indexBlobManager.writeIndexBlobs(ctx, dataShards, sessionID)
}
// +checklocksread:bm.indexesLock
func (bm *WriteManager) addIndexBlob(ctx context.Context, indexBlobID blob.ID, data gather.Bytes, use bool) error {
return bm.committedContents.addIndexBlob(ctx, indexBlobID, data, use)
}
// +checklocks:bm.mu
func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error {
if bm.disableIndexFlushCount > 0 {
bm.log.Debugf("not flushing index because flushes are currently disabled")
@@ -410,7 +437,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error {
bm.indexesLock.RLock()
defer bm.indexesLock.RUnlock()
indexBlobMDs, err := bm.indexBlobManager.writeIndexBlobs(ctx, dataShards, bm.currentSessionInfo.ID)
indexBlobMDs, err := bm.writeIndexBlobs(ctx, dataShards, bm.currentSessionInfo.ID)
if err != nil {
return errors.Wrap(err, "error writing index blob")
}
@@ -425,7 +452,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error {
for i, indexBlobMD := range indexBlobMDs {
bm.onUpload(int64(dataShards[i].Length()))
if err := bm.committedContents.addIndexBlob(ctx, indexBlobMD.BlobID, dataShards[i], true); err != nil {
if err := bm.addIndexBlob(ctx, indexBlobMD.BlobID, dataShards[i], true); err != nil {
return errors.Wrap(err, "unable to add committed content")
}
}
@@ -438,12 +465,13 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error {
return nil
}
// +checklocks:bm.mu
func (bm *WriteManager) finishAllPacksLocked(ctx context.Context) error {
for prefix, pp := range bm.pendingPacks {
delete(bm.pendingPacks, prefix)
bm.writingPacks = append(bm.writingPacks, pp)
if err := bm.writePackAndAddToIndexLocked(ctx, pp, true); err != nil {
if err := bm.writePackAndAddToIndexLocked(ctx, pp); err != nil {
return errors.Wrap(err, "error writing pack content")
}
}
@@ -451,18 +479,19 @@ func (bm *WriteManager) finishAllPacksLocked(ctx context.Context) error {
return nil
}
func (bm *WriteManager) writePackAndAddToIndexLocked(ctx context.Context, pp *pendingPackInfo, holdingLock bool) error {
func (bm *WriteManager) acquireLockAndWritePackAndAddToIndex(ctx context.Context, pp *pendingPackInfo) error {
bm.lock()
defer bm.unlock()
defer bm.cond.Broadcast()
return bm.writePackAndAddToIndexLocked(ctx, pp)
}
// +checklocks:bm.mu
func (bm *WriteManager) writePackAndAddToIndexLocked(ctx context.Context, pp *pendingPackInfo) error {
packFileIndex, err := bm.prepareAndWritePackInternal(ctx, pp)
if !holdingLock {
bm.lock()
defer func() {
bm.cond.Broadcast()
bm.unlock()
}()
}
// after finishing writing, remove from both writingPacks and failedPacks
bm.writingPacks = removePendingPack(bm.writingPacks, pp)
bm.failedPacks = removePendingPack(bm.failedPacks, pp)
@@ -524,6 +553,11 @@ func (bm *WriteManager) Close(ctx context.Context) error {
return bm.SharedManager.release(ctx)
}
// +checklocks:bm.mu
func (bm *WriteManager) setFlushingLocked(v bool) {
bm.flushing = v
}
// Flush completes writing any pending packs and writes pack indexes to the underlying storage.
// Any pending writes completed before Flush() has started are guaranteed to be committed to the
// repository before Flush() returns.
@@ -533,14 +567,11 @@ func (bm *WriteManager) Flush(ctx context.Context) error {
bm.log.Debugf("flush")
bm.flushing = true
// when finished flushing, notify goroutines that were waiting for it.
defer bm.cond.Broadcast()
defer func() {
bm.flushing = false
// we finished flushing, notify goroutines that were waiting for it.
bm.cond.Broadcast()
}()
bm.setFlushingLocked(true)
defer bm.setFlushingLocked(false)
// see if we have any packs that have failed previously
// retry writing them now.
@@ -551,7 +582,7 @@ func (bm *WriteManager) Flush(ctx context.Context) error {
for _, pp := range fp {
bm.log.Debugf("retry-write %v", pp.packBlobID)
if err := bm.writePackAndAddToIndexLocked(ctx, pp, true); err != nil {
if err := bm.writePackAndAddToIndexLocked(ctx, pp); err != nil {
return errors.Wrap(err, "error writing previously failed pack")
}
}
@@ -644,6 +675,7 @@ func packPrefixForContentID(contentID ID) blob.ID {
return PackBlobIDPrefixRegular
}
// +checklocks:bm.mu
func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, prefix blob.ID) (*pendingPackInfo, error) {
if pp := bm.pendingPacks[prefix]; pp != nil {
return pp, nil
@@ -745,6 +777,7 @@ func (bm *WriteManager) GetContent(ctx context.Context, contentID ID) (v []byte,
return tmp.ToByteSlice(), nil
}
// +checklocksread:bm.mu
func (bm *WriteManager) getOverlayContentInfoReadLocked(contentID ID) (*pendingPackInfo, Info, bool) {
// check added contents, not written to any packs yet.
for _, pp := range bm.pendingPacks {
@@ -768,6 +801,7 @@ func (bm *WriteManager) getOverlayContentInfoReadLocked(contentID ID) (*pendingP
return nil, nil, false
}
// +checklocksread:bm.mu
func (bm *WriteManager) getContentInfoReadLocked(ctx context.Context, contentID ID) (*pendingPackInfo, Info, error) {
if pp, ci, ok := bm.getOverlayContentInfoReadLocked(contentID); ok {
return pp, ci, nil
@@ -802,10 +836,12 @@ func (bm *WriteManager) DisableIndexRefresh() {
atomic.StoreInt32(&bm.disableIndexRefresh, 1)
}
// +checklocksacquire:bm.mu
func (bm *WriteManager) lock() {
bm.mu.Lock()
}
// +checklocksrelease:bm.mu
func (bm *WriteManager) unlock() {
if bm.checkInvariantsOnUnlock {
bm.verifyInvariantsLocked()
@@ -863,20 +899,15 @@ type SessionOptions struct {
// NewWriteManager returns a session write manager.
func NewWriteManager(ctx context.Context, sm *SharedManager, options SessionOptions, writeManagerID string) *WriteManager {
mu := &sync.RWMutex{}
sm.addRef()
if options.OnUpload == nil {
options.OnUpload = func(int64) {}
}
return &WriteManager{
wm := &WriteManager{
SharedManager: sm,
mu: mu,
cond: sync.NewCond(mu),
flushPackIndexesAfter: sm.timeNow().Add(flushPackIndexTimeout),
pendingPacks: map[blob.ID]*pendingPackInfo{},
packIndexBuilder: make(packIndexBuilder),
@@ -886,4 +917,8 @@ func NewWriteManager(ctx context.Context, sm *SharedManager, options SessionOpti
log: sm.namedLogger(writeManagerID),
}
wm.cond = sync.NewCond(&wm.mu)
return wm
}

View File

@@ -25,6 +25,7 @@
const TextLogBlobPrefix = "_log_"
type internalLogManager struct {
// +checkatomic
enabled int32 // set by enable(), logger is ineffective until called
// internalLogManager implements io.Writer and we must be able to write to the
@@ -97,14 +98,20 @@ func (m *internalLogManager) NewLogger() *zap.SugaredLogger {
// internalLogger represents a single log session that saves log files as blobs in the repository.
// The logger starts disabled and to actually persist logs enable() must be called.
type internalLogger struct {
// +checkatomic
nextChunkNumber int32 // chunk number incremented using atomic.AddInt32()
m *internalLogManager
mu sync.Mutex
buf *gather.WriteBuffer
gzw *gzip.Writer
m *internalLogManager
mu sync.Mutex
// +checklocks:mu
buf *gather.WriteBuffer
// +checklocks:mu
gzw *gzip.Writer
startTime int64 // unix timestamp of the first log
prefix blob.ID
prefix blob.ID // +checklocksignore
}
func (m *internalLogManager) enable() {
@@ -133,9 +140,7 @@ func (l *internalLogger) maybeEncryptAndWriteChunkUnlocked(data gather.Bytes, cl
endTime := l.m.timeFunc().Unix()
l.mu.Lock()
prefix := blob.ID(fmt.Sprintf("%v_%v_%v_%v_", l.prefix, l.startTime, endTime, atomic.AddInt32(&l.nextChunkNumber, 1)))
l.mu.Unlock()
l.m.encryptAndWriteLogBlob(prefix, data, closeFunc)
}
@@ -156,6 +161,7 @@ func (l *internalLogger) addAndMaybeFlush(b []byte) (payload gather.Bytes, close
return l.flushAndResetLocked()
}
// +checklocks:l.mu
func (l *internalLogger) ensureWriterInitializedLocked() io.Writer {
if l.gzw == nil {
l.buf = gather.NewWriteBuffer()
@@ -166,6 +172,7 @@ func (l *internalLogger) ensureWriterInitializedLocked() io.Writer {
return l.gzw
}
// +checklocks:l.mu
func (l *internalLogger) flushAndResetLocked() (payload gather.Bytes, closeFunc func()) {
if l.gzw == nil {
return gather.Bytes{}, func() {}

View File

@@ -8,17 +8,27 @@
type Stats struct {
// Keep int64 fields first to ensure they get aligned to at least 64-bit
// boundaries, which is required for atomic access on ARM and x86-32.
readBytes int64
writtenBytes int64
// +checkatomic
readBytes int64
// +checkatomic
writtenBytes int64
// +checkatomic
decryptedBytes int64
// +checkatomic
encryptedBytes int64
hashedBytes int64
// +checkatomic
hashedBytes int64
readContents uint32
// +checkatomic
readContents uint32
// +checkatomic
writtenContents uint32
hashedContents uint32
// +checkatomic
hashedContents uint32
// +checkatomic
invalidContents uint32
validContents uint32
// +checkatomic
validContents uint32
}
// Reset clears all content statistics.
@@ -38,17 +48,17 @@ func (s *Stats) Reset() {
// ReadContent returns the approximate read content count and their total size in bytes.
func (s *Stats) ReadContent() (count uint32, bytes int64) {
return readCountSum(&s.readContents, &s.readBytes)
return atomic.LoadUint32(&s.readContents), atomic.LoadInt64(&s.readBytes)
}
// WrittenContent returns the approximate written content count and their total size in bytes.
func (s *Stats) WrittenContent() (count uint32, bytes int64) {
return readCountSum(&s.writtenContents, &s.writtenBytes)
return atomic.LoadUint32(&s.writtenContents), atomic.LoadInt64(&s.writtenBytes)
}
// HashedContent returns the approximate hashed content count and their total size in bytes.
func (s *Stats) HashedContent() (count uint32, bytes int64) {
return readCountSum(&s.hashedContents, &s.hashedBytes)
return atomic.LoadUint32(&s.hashedContents), atomic.LoadInt64(&s.hashedBytes)
}
// DecryptedBytes returns the approximate total number of decrypted bytes.
@@ -80,15 +90,15 @@ func (s *Stats) encrypted(size int) int64 {
}
func (s *Stats) readContent(size int) (count uint32, sum int64) {
return updateCountSum(&s.readContents, &s.readBytes, size)
return atomic.AddUint32(&s.readContents, 1), atomic.AddInt64(&s.readBytes, int64(size))
}
func (s *Stats) wroteContent(size int) (count uint32, sum int64) {
return updateCountSum(&s.writtenContents, &s.writtenBytes, size)
return atomic.AddUint32(&s.writtenContents, 1), atomic.AddInt64(&s.writtenBytes, int64(size))
}
func (s *Stats) hashedContent(size int) (count uint32, sum int64) {
return updateCountSum(&s.hashedContents, &s.hashedBytes, size)
return atomic.AddUint32(&s.hashedContents, 1), atomic.AddInt64(&s.hashedBytes, int64(size))
}
func (s *Stats) foundValidContent() uint32 {
@@ -98,11 +108,3 @@ func (s *Stats) foundValidContent() uint32 {
func (s *Stats) foundInvalidContent() uint32 {
return atomic.AddUint32(&s.invalidContents, 1)
}
func updateCountSum(count *uint32, sum *int64, delta int) (updatedCount uint32, updatedSum int64) {
return atomic.AddUint32(count, 1), atomic.AddInt64(sum, int64(delta))
}
func readCountSum(count *uint32, sum *int64) (c uint32, s int64) {
return atomic.LoadUint32(count), atomic.LoadInt64(sum)
}

View File

@@ -59,13 +59,16 @@ type grpcRepositoryClient struct {
conn *grpc.ClientConn
innerSessionMutex sync.Mutex
innerSession *grpcInnerSession
// +checklocks:innerSessionMutex
innerSession *grpcInnerSession
opt WriteSessionOptions
isReadOnly bool
transparentRetries bool
// how many times we tried to establish inner session
// +checklocks:innerSessionMutex
innerSessionAttemptCount int
asyncWritesSemaphore chan struct{}
@@ -86,10 +89,15 @@ type grpcInnerSession struct {
sendMutex sync.Mutex
activeRequestsMutex sync.Mutex
nextRequestID int64
activeRequests map[int64]chan *apipb.SessionResponse
cli apipb.KopiaRepository_SessionClient
repoParams *apipb.RepositoryParameters
// +checklocks:activeRequestsMutex
nextRequestID int64
// +checklocks:activeRequestsMutex
activeRequests map[int64]chan *apipb.SessionResponse
cli apipb.KopiaRepository_SessionClient
repoParams *apipb.RepositoryParameters
}
// readLoop runs in a goroutine and consumes all messages in session and forwards them to appropriate channels.
@@ -152,6 +160,7 @@ func (r *grpcInnerSession) sendRequest(ctx context.Context, req *apipb.SessionRe
return ch
}
// +checklocks:r.activeRequestsMutex
func (r *grpcInnerSession) getAndDeleteResponseChannelLocked(rid int64) chan *apipb.SessionResponse {
ch := r.activeRequests[rid]
delete(r.activeRequests, rid)

View File

@@ -21,12 +21,16 @@
type committedManifestManager struct {
b contentManager
debugID string
debugID string // +checklocksignore
cmmu sync.Mutex
lastRevision int64
locked bool
committedEntries map[ID]*manifestEntry
cmmu sync.Mutex
// +checklocks:cmmu
lastRevision int64
// +checklocks:cmmu
locked bool
// +checklocks:cmmu
committedEntries map[ID]*manifestEntry
// +checklocks:cmmu
committedContentIDs map[content.ID]bool
}
@@ -41,6 +45,7 @@ func (m *committedManifestManager) getCommittedEntryOrNil(ctx context.Context, i
return m.committedEntries[id], nil
}
// +checklocks:m.cmmu
func (m *committedManifestManager) dump(ctx context.Context, prefix string) {
if m.debugID == "" {
return
@@ -86,6 +91,7 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma
// NOTE: this function is used in two cases - to write pending entries (where the caller acquires
// the lock via commitEntries()) and to compact existing committed entries during compaction
// where the lock is already being held.
// +checklocks:m.cmmu
func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) {
if len(entries) == 0 {
return nil, nil
@@ -120,6 +126,7 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri
return map[content.ID]bool{contentID: true}, nil
}
// +checklocks:m.cmmu
func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Context) error {
m.verifyLocked()
@@ -168,6 +175,7 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte
return nil
}
// +checklocks:m.cmmu
func (m *committedManifestManager) loadManifestContentsLocked(manifests map[content.ID]manifest) {
m.committedEntries = map[ID]*manifestEntry{}
m.committedContentIDs = map[content.ID]bool{}
@@ -197,6 +205,7 @@ func (m *committedManifestManager) compact(ctx context.Context) error {
return m.compactLocked(ctx)
}
// +checklocks:m.cmmu
func (m *committedManifestManager) maybeCompactLocked(ctx context.Context) error {
m.verifyLocked()
@@ -217,6 +226,7 @@ func (m *committedManifestManager) maybeCompactLocked(ctx context.Context) error
return nil
}
// +checklocks:m.cmmu
func (m *committedManifestManager) compactLocked(ctx context.Context) error {
m.verifyLocked()
@@ -258,6 +268,7 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error {
return nil
}
// +checklocks:m.cmmu
func (m *committedManifestManager) mergeEntryLocked(e *manifestEntry) {
m.verifyLocked()
@@ -272,6 +283,7 @@ func (m *committedManifestManager) mergeEntryLocked(e *manifestEntry) {
}
}
// +checklocks:m.cmmu
func (m *committedManifestManager) ensureInitializedLocked(ctx context.Context) error {
rev := m.b.Revision()
if m.lastRevision == rev {
@@ -297,16 +309,19 @@ func (m *committedManifestManager) ensureInitializedLocked(ctx context.Context)
return nil
}
// +checklocksacquire:m.cmmu
func (m *committedManifestManager) lock() {
m.cmmu.Lock()
m.locked = true
}
// +checklocksrelease:m.cmmu
func (m *committedManifestManager) unlock() {
m.locked = false
m.cmmu.Unlock()
}
// +checklocks:m.cmmu
func (m *committedManifestManager) verifyLocked() {
if !m.locked {
panic("not locked")

View File

@@ -24,7 +24,7 @@
manifestIDLength = 16
)
var log = logging.Module("kopia/manifest")
var log = logging.Module("kopia/manifest") // +checklocksignore
// ErrNotFound is returned when the metadata item is not found.
var ErrNotFound = errors.New("not found")
@@ -57,6 +57,7 @@ type Manager struct {
mu sync.Mutex
b contentManager
// +checklocks:mu
pendingEntries map[ID]*manifestEntry
committed *committedManifestManager

View File

@@ -32,9 +32,13 @@
var errSomeError = errors.Errorf("some error")
type fakeContentManager struct {
mu sync.Mutex
data map[content.ID][]byte
compresionIDs map[content.ID]compression.HeaderID
mu sync.Mutex
// +checklocks:mu
data map[content.ID][]byte
// +checklocks:mu
compresionIDs map[content.ID]compression.HeaderID
supportsContentCompression bool
writeContentError error
}

View File

@@ -34,7 +34,8 @@ type Writer interface {
}
type contentIDTracker struct {
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
contents map[content.ID]bool
}

View File

@@ -7,10 +7,13 @@
)
type recentlyRead struct {
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
contentList []content.ID
next int
contentSet map[content.ID]struct{}
// +checklocks:mu
next int
// +checklocks:mu
contentSet map[content.ID]struct{}
}
func (r *recentlyRead) add(contentID content.ID) {

View File

@@ -172,21 +172,27 @@ type StorageStats struct {
// StorageUsageDetails provides details about snapshot storage usage.
type StorageUsageDetails struct {
// number of bytes in all objects (ignoring content-level deduplication).
// +checkatomic
ObjectBytes int64 `json:"objectBytes"`
// number of bytes in all unique contents (original).
// +checkatomic
OriginalContentBytes int64 `json:"originalContentBytes"`
// number of bytes in all unique contents as stored in the repository.
// +checkatomic
PackedContentBytes int64 `json:"packedContentBytes"`
// number of unique file objects.
// +checkatomic
FileObjectCount int32 `json:"fileObjects"`
// number of unique objects.
// +checkatomic
DirObjectCount int32 `json:"dirObjects"`
// number of unique contents.
// +checkatomic
ContentCount int32 `json:"contents"`
}

View File

@@ -32,18 +32,29 @@ type Output interface {
// Stats represents restore statistics.
type Stats struct {
// +checkatomic
RestoredTotalFileSize int64
// +checkatomic
EnqueuedTotalFileSize int64
SkippedTotalFileSize int64
// +checkatomic
SkippedTotalFileSize int64
RestoredFileCount int32
RestoredDirCount int32
// +checkatomic
RestoredFileCount int32
// +checkatomic
RestoredDirCount int32
// +checkatomic
RestoredSymlinkCount int32
EnqueuedFileCount int32
EnqueuedDirCount int32
// +checkatomic
EnqueuedFileCount int32
// +checkatomic
EnqueuedDirCount int32
// +checkatomic
EnqueuedSymlinkCount int32
SkippedCount int32
IgnoredErrorCount int32
// +checkatomic
SkippedCount int32
// +checkatomic
IgnoredErrorCount int32
}
func (s *Stats) clone() Stats {

View File

@@ -17,6 +17,7 @@
type checkpointRegistry struct {
mu sync.Mutex
// +checklocks:mu
checkpoints map[string]checkpointFunc
}

View File

@@ -4,6 +4,7 @@
"context"
"fmt"
"path/filepath"
"sync/atomic"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/ignorefs"
@@ -88,13 +89,13 @@ func Estimate(ctx context.Context, rep repo.Repository, entry fs.Directory, poli
ed = append(ed, relativePath)
}
stats.ExcludedDirCount++
atomic.AddInt32(&stats.ExcludedDirCount, 1)
estimateLog(ctx).Debugf("excluded dir %v", relativePath)
} else {
estimateLog(ctx).Debugf("excluded file %v (%v)", relativePath, units.BytesStringBase10(e.Size()))
stats.ExcludedFileCount++
stats.ExcludedTotalFileSize += e.Size()
atomic.AddInt32(&stats.ExcludedFileCount, 1)
atomic.AddInt64(&stats.ExcludedTotalFileSize, e.Size())
eb.add(relativePath, e.Size(), maxExamplesPerBucket)
}
}
@@ -116,7 +117,7 @@ func estimate(ctx context.Context, relativePath string, entry fs.Entry, policyTr
switch entry := entry.(type) {
case fs.Directory:
stats.TotalDirectoryCount++
atomic.AddInt32(&stats.TotalDirectoryCount, 1)
progress.Processing(ctx, relativePath)
@@ -125,9 +126,9 @@ func estimate(ctx context.Context, relativePath string, entry fs.Entry, policyTr
isIgnored := policyTree.EffectivePolicy().ErrorHandlingPolicy.IgnoreDirectoryErrors.OrDefault(false)
if isIgnored {
stats.IgnoredErrorCount++
atomic.AddInt32(&stats.IgnoredErrorCount, 1)
} else {
stats.ErrorCount++
atomic.AddInt32(&stats.ErrorCount, 1)
}
progress.Error(ctx, relativePath, err, isIgnored)
@@ -143,8 +144,8 @@ func estimate(ctx context.Context, relativePath string, entry fs.Entry, policyTr
case fs.File:
ib.add(relativePath, entry.Size(), maxExamplesPerBucket)
stats.TotalFileCount++
stats.TotalFileSize += entry.Size()
atomic.AddInt32(&stats.TotalFileCount, 1)
atomic.AddInt64(&stats.TotalFileSize, entry.Size())
}
return nil

View File

@@ -26,9 +26,11 @@ type TreeWalker struct {
enqueued sync.Map
wp *workshare.Pool
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
numErrors int
errors []error
// +checklocks:mu
errors []error
}
func oidOf(e fs.Entry) object.ID {

View File

@@ -60,6 +60,7 @@
// Uploader supports efficient uploading files and directories to repository.
type Uploader struct {
// values aligned to 8-bytes due to atomic access
// +checkatomic
totalWrittenBytes int64
Progress UploadProgress
@@ -94,7 +95,9 @@ type Uploader struct {
repo repo.RepositoryWriter
// stats must be allocated on heap to enforce 64-bit alignment due to atomic access on ARM.
stats *snapshot.Stats
stats *snapshot.Stats
// +checkatomic
canceled int32
getTicker func(time.Duration) <-chan time.Time
@@ -542,7 +545,9 @@ func rootCauseError(err error) error {
type dirManifestBuilder struct {
mu sync.Mutex
// +checklocks:mu
summary fs.DirectorySummary
// +checklocks:mu
entries []*snapshot.DirEntry
}
@@ -615,7 +620,9 @@ func (b *dirManifestBuilder) Build(dirModTime time.Time, incompleteReason string
s := b.summary
s.TotalDirCount++
if len(b.entries) == 0 {
entries := b.entries
if len(entries) == 0 {
s.MaxModTime = dirModTime
}
@@ -625,18 +632,18 @@ func (b *dirManifestBuilder) Build(dirModTime time.Time, incompleteReason string
// sort the result, directories first, then non-directories, ordered by name
sort.Slice(b.entries, func(i, j int) bool {
if leftDir, rightDir := isDir(b.entries[i]), isDir(b.entries[j]); leftDir != rightDir {
if leftDir, rightDir := isDir(entries[i]), isDir(entries[j]); leftDir != rightDir {
// directories get sorted before non-directories
return leftDir
}
return b.entries[i].Name < b.entries[j].Name
return entries[i].Name < entries[j].Name
})
return &snapshot.DirManifest{
StreamType: directoryStreamType,
Summary: &s,
Entries: b.entries,
Entries: entries,
}
}
@@ -1280,7 +1287,7 @@ func (u *Uploader) Upload(
defer u.workerPool.Close()
u.stats = &snapshot.Stats{}
u.totalWrittenBytes = 0
atomic.StoreInt64(&u.totalWrittenBytes, 0)
var err error

View File

@@ -95,21 +95,32 @@ func (p *NullUploadProgress) Error(path string, err error, isIgnored bool) {}
// UploadCounters represents a snapshot of upload counters.
type UploadCounters struct {
TotalCachedBytes int64 `json:"cachedBytes"`
TotalHashedBytes int64 `json:"hashedBytes"`
// +checkatomic
TotalCachedBytes int64 `json:"cachedBytes"`
// +checkatomic
TotalHashedBytes int64 `json:"hashedBytes"`
// +checkatomic
TotalUploadedBytes int64 `json:"uploadedBytes"`
// +checkatomic
EstimatedBytes int64 `json:"estimatedBytes"`
// +checkatomic
TotalCachedFiles int32 `json:"cachedFiles"`
// +checkatomic
TotalHashedFiles int32 `json:"hashedFiles"`
// +checkatomic
TotalExcludedFiles int32 `json:"excludedFiles"`
TotalExcludedDirs int32 `json:"excludedDirs"`
// +checkatomic
TotalExcludedDirs int32 `json:"excludedDirs"`
FatalErrorCount int32 `json:"errors"`
// +checkatomic
FatalErrorCount int32 `json:"errors"`
// +checkatomic
IgnoredErrorCount int32 `json:"ignoredErrors"`
EstimatedFiles int32 `json:"estimatedFiles"`
// +checkatomic
EstimatedFiles int32 `json:"estimatedFiles"`
CurrentDirectory string `json:"directory"`
@@ -175,9 +186,9 @@ func (p *CountingUploadProgress) Error(path string, err error, isIgnored bool) {
defer p.mu.Unlock()
if isIgnored {
p.counters.IgnoredErrorCount++
atomic.AddInt32(&p.counters.IgnoredErrorCount, 1)
} else {
p.counters.FatalErrorCount++
atomic.AddInt32(&p.counters.FatalErrorCount, 1)
}
p.counters.LastErrorPath = path

View File

@@ -2,6 +2,7 @@
import (
"context"
"sync/atomic"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/snapshot"
@@ -19,8 +20,8 @@ func (e *scanResults) Processing(ctx context.Context, pathname string) {}
func (e *scanResults) Stats(ctx context.Context, s *snapshot.Stats, includedFiles, excludedFiles SampleBuckets, excludedDirs []string, final bool) {
if final {
e.numFiles = int(s.TotalFileCount)
e.totalFileSize = s.TotalFileSize
e.numFiles = int(atomic.LoadInt32(&s.TotalFileCount))
e.totalFileSize = atomic.LoadInt64(&s.TotalFileSize)
}
}

View File

@@ -8,6 +8,7 @@
"path/filepath"
"reflect"
"sort"
"sync/atomic"
"testing"
"time"
@@ -147,16 +148,16 @@ func TestUpload(t *testing.T) {
t.Errorf("expected s1.RootObjectID==s2.RootObjectID, got %v and %v", s1.RootObjectID().String(), s2.RootObjectID().String())
}
if got, want := s1.Stats.CachedFiles, int32(0); got != want {
if got, want := atomic.LoadInt32(&s1.Stats.CachedFiles), int32(0); got != want {
t.Errorf("unexpected s1 cached files: %v, want %v", got, want)
}
// All non-cached files from s1 are now cached and there are no non-cached files since nothing changed.
if got, want := s2.Stats.CachedFiles, s1.Stats.NonCachedFiles; got != want {
if got, want := atomic.LoadInt32(&s2.Stats.CachedFiles), atomic.LoadInt32(&s1.Stats.NonCachedFiles); got != want {
t.Errorf("unexpected s2 cached files: %v, want %v", got, want)
}
if got, want := s2.Stats.NonCachedFiles, int32(0); got != want {
if got, want := atomic.LoadInt32(&s2.Stats.NonCachedFiles), int32(0); got != want {
t.Errorf("unexpected non-cached files: %v", got)
}
@@ -172,7 +173,7 @@ func TestUpload(t *testing.T) {
t.Errorf("expected s3.RootObjectID!=s2.RootObjectID, got %v", s3.RootObjectID().String())
}
if s3.Stats.NonCachedFiles != 1 {
if atomic.LoadInt32(&s3.Stats.NonCachedFiles) != 1 {
// one file is not cached, which causes "./d2/d1/", "./d2/" and "./" to be changed.
t.Errorf("unexpected s3 stats: %+v", s3.Stats)
}
@@ -190,7 +191,7 @@ func TestUpload(t *testing.T) {
}
// Everything is still cached.
if s4.Stats.CachedFiles != s1.Stats.NonCachedFiles || s4.Stats.NonCachedFiles != 0 {
if atomic.LoadInt32(&s4.Stats.CachedFiles) != atomic.LoadInt32(&s1.Stats.NonCachedFiles) || atomic.LoadInt32(&s4.Stats.NonCachedFiles) != 0 {
t.Errorf("unexpected s4 stats: %+v", s4.Stats)
}
@@ -203,7 +204,7 @@ func TestUpload(t *testing.T) {
t.Errorf("expected s4.RootObjectID==s5.RootObjectID, got %v and %v", s4.RootObjectID(), s5.RootObjectID())
}
if s5.Stats.NonCachedFiles != 0 {
if atomic.LoadInt32(&s5.Stats.NonCachedFiles) != 0 {
// no files are changed, but one file disappeared which caused "./d2/d1/", "./d2/" and "./" to be changed.
t.Errorf("unexpected s5 stats: %+v", s5.Stats)
}
@@ -694,21 +695,21 @@ func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) {
t.Fatalf("Upload error: %v", err)
}
if got, want := man.Stats.CachedFiles, int32(0); got != want {
if got, want := atomic.LoadInt32(&man.Stats.CachedFiles), int32(0); got != want {
t.Fatalf("unexpected manifest cached files: %v, want %v", got, want)
}
if got, want := man.Stats.NonCachedFiles, int32(1); got != want {
if got, want := atomic.LoadInt32(&man.Stats.NonCachedFiles), int32(1); got != want {
// one file is not cached
t.Fatalf("unexpected manifest non-cached files: %v, want %v", got, want)
}
if got, want := man.Stats.TotalDirectoryCount, int32(1); got != want {
if got, want := atomic.LoadInt32(&man.Stats.TotalDirectoryCount), int32(1); got != want {
// must have one directory
t.Fatalf("unexpected manifest directory count: %v, want %v", got, want)
}
if got, want := man.Stats.TotalFileCount, int32(1); got != want {
if got, want := atomic.LoadInt32(&man.Stats.TotalFileCount), int32(1); got != want {
// must have one file
t.Fatalf("unexpected manifest file count: %v, want %v", got, want)
}

View File

@@ -38,7 +38,7 @@ type Checker struct {
DeleteLimit int
mu sync.RWMutex
SnapIDIndex snapmeta.Index
SnapIDIndex snapmeta.Index // +checklocksignore
}
// NewChecker instantiates a new Checker, returning its pointer. A temporary

View File

@@ -17,8 +17,10 @@
// delegate to a specific client FileWriter.
type MultiClientFileWriter struct {
// Map of client ID to FileWriter and associated mutex
mu sync.RWMutex
// +checklocks:mu
fileWriters map[string]FileWriter
mu sync.RWMutex
// Function used to generate new FileWriters
newFileWriter newFileWriterFn

View File

@@ -36,7 +36,9 @@ type Unlocker interface {
// that has already been Locked. The thread will be blocked until the holder
// of the lock calls Unlock.
type pathLock struct {
mu sync.Mutex
mu sync.Mutex
// +checklocks:mu
lockedPaths map[string]chan struct{}
}

View File

@@ -11,6 +11,7 @@
"log"
"os"
"path/filepath"
"sync/atomic"
"github.com/pkg/errors"
@@ -99,7 +100,7 @@ func (kc *KopiaClient) SnapshotCreate(ctx context.Context, key string, val []byt
return errors.Wrap(err, "cannot get manifest")
}
log.Printf("snapshotting %v", units.BytesStringBase10(man.Stats.TotalFileSize))
log.Printf("snapshotting %v", units.BytesStringBase10(atomic.LoadInt64(&man.Stats.TotalFileSize)))
if _, err := snapshot.SaveSnapshot(ctx, rw, man); err != nil {
return errors.Wrap(err, "cannot save snapshot")

View File

@@ -103,6 +103,7 @@ endif
# tool versions
GOLANGCI_LINT_VERSION=1.45.0
CHECKLOCKS_VERSION=release-20220314.0
NODE_VERSION=16.13.0
HUGO_VERSION=0.89.2
GOTESTSUM_VERSION=1.7.0
@@ -149,6 +150,14 @@ endif
$(linter):
go run github.com/kopia/kopia/tools/gettool --tool linter:$(GOLANGCI_LINT_VERSION) --output-dir $(linter_dir)
# checklocks
checklocks_dir=$(TOOLS_DIR)$(slash)checklocks-$(CHECKLOCKS_VERSION)
checklocks=$(checklocks_dir)$(slash)bin$(slash)checklocks$(exe_suffix)
$(checklocks): export GOPATH=$(checklocks_dir)
$(checklocks):
go install gvisor.dev/gvisor/tools/checklocks/cmd/checklocks@$(CHECKLOCKS_VERSION)
# hugo
hugo_dir=$(TOOLS_DIR)$(slash)hugo-$(HUGO_VERSION)
hugo=$(hugo_dir)/hugo$(exe_suffix)