build(deps): Go 1.25 (#4987)

Upgrade to Go 1.25
Leverage `WaitGroup.Go` in Go 1.25
This commit is contained in:
Julio Lopez
2025-11-17 16:42:12 -08:00
committed by GitHub
parent 1aaf433cdc
commit 7db061ee71
26 changed files with 57 additions and 166 deletions

View File

@@ -37,7 +37,7 @@ jobs:
uses: golang/govulncheck-action@b625fbe08f3bccbe446d94fbf87fcc875a4f50ee # v1.0.4
with:
cache: false
go-version-input: '1.24.10'
go-version-input: '1.25.4'
# An explicit Go version is needed for govulncheck-action since internally
# it uses an outdated setup-go@v5.0 action that does not respect the 'toolchain'
# directive in the 'go.mod' file.

View File

@@ -58,13 +58,9 @@ func runInParallel[A, T any](args []A, run func(arg A) T) T {
var wg sync.WaitGroup
for _, arg := range args[1:] {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
run(arg)
}()
})
}
// run one on the main goroutine and N-1 in parallel.

View File

@@ -51,13 +51,10 @@ func (c *commandContentVerify) run(ctx context.Context, rep repo.DirectRepositor
}()
// start a goroutine that will populate totalCount
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
c.getTotalContentCount(subctx, rep, &totalCount)
}()
})
rep.DisableIndexRefresh()

View File

@@ -43,13 +43,9 @@ func (c *commandIndexInspect) run(ctx context.Context, rep repo.DirectRepository
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
c.dumpIndexBlobEntries(output)
}()
})
err := c.runWithOutput(ctx, rep, output)
close(output)

4
go.mod
View File

@@ -1,8 +1,8 @@
module github.com/kopia/kopia
go 1.24.0
go 1.25
toolchain go1.24.10
toolchain go1.25.4
require (
cloud.google.com/go/storage v1.57.1

View File

@@ -178,16 +178,12 @@ func testGetContentForDifferentContentIDsExecutesInParallel(t *testing.T, newCac
var wg sync.WaitGroup
for i := range 20 {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var tmp gather.WriteBuffer
defer tmp.Close()
dataCache.GetContent(ctx, fmt.Sprintf("c%v", i), "blob1", int64(i), 1, &tmp)
}()
})
}
wg.Wait()
@@ -226,16 +222,12 @@ func testGetContentForDifferentBlobsExecutesInParallel(t *testing.T, newCache ne
var wg sync.WaitGroup
for i := range 20 {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var tmp gather.WriteBuffer
defer tmp.Close()
dataCache.GetContent(ctx, fmt.Sprintf("c%v", i), blob.ID(fmt.Sprintf("blob%v", i)), int64(i), 1, &tmp)
}()
})
}
wg.Wait()
@@ -273,16 +265,12 @@ func testGetContentRaceFetchesOnce(t *testing.T, newCache newContentCacheFunc) {
var wg sync.WaitGroup
for range 20 {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var tmp gather.WriteBuffer
defer tmp.Close()
dataCache.GetContent(ctx, "c1", "blob1", 0, 1, &tmp)
}()
})
}
wg.Wait()

View File

@@ -239,15 +239,7 @@ func (c *loggingFlags) setupLogFileBasedLogger(now time.Time, subdir, suffix, lo
logFileBaseName: logFileBaseName,
symlinkName: symlinkName,
maxSegmentSize: c.logFileMaxSegmentSize,
startSweep: func() {
sweepLogWG.Add(1)
go func() {
defer sweepLogWG.Done()
doSweep()
}()
},
startSweep: func() { sweepLogWG.Go(doSweep) },
}
if c.waitForLogSweep {

View File

@@ -37,10 +37,7 @@ func (w *BlobWriter) EncryptAndWriteBlobAsync(ctx context.Context, prefix blob.I
b := encrypted.Bytes()
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.wg.Go(func() {
defer encrypted.Close()
defer closeFunc()
@@ -49,7 +46,7 @@ func (w *BlobWriter) EncryptAndWriteBlobAsync(ctx context.Context, prefix blob.I
log(ctx).Warnf("unable to write diagnostics blob: %v", err)
return
}
}()
})
}
// Wait waits for all the writes to complete.

View File

@@ -61,13 +61,9 @@ func Start(ctx context.Context, getItems GetItemsFunc, opts Options) *Scheduler
Debug: opts.Debug,
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.wg.Go(func() {
s.run(context.WithoutCancel(ctx))
}()
})
return s
}

View File

@@ -130,13 +130,7 @@ func TestTimerConcurrentStop(t *testing.T) {
var wg sync.WaitGroup
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
timer.Stop()
}()
wg.Go(timer.Stop)
}
wg.Wait()

View File

@@ -48,11 +48,7 @@ func NewPool[T any](numWorkers int) *Pool[T] {
}
for range numWorkers {
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.wg.Go(func() {
for {
select {
case it := <-w.work:
@@ -66,7 +62,7 @@ func NewPool[T any](numWorkers int) *Pool[T] {
return
}
}
}()
})
}
return w

View File

@@ -263,15 +263,11 @@ func TestRCloneProviders(t *testing.T) {
prefix := uuid.NewString()
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for j := range 3 {
assert.NoError(t, st.PutBlob(ctx, blob.ID(fmt.Sprintf("%v-%v-%v", prefix, i, j)), gather.FromSlice([]byte{1, 2, 3}), blob.PutOptions{}))
}
}()
})
}
wg.Wait()

View File

@@ -112,15 +112,11 @@ func testRateLimiting(t *testing.T, name string, wantRate float64, worker func(t
var wg sync.WaitGroup
for range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for clock.Now().Before(deadline) {
worker(total)
}
}()
})
}
wg.Wait()

View File

@@ -27,11 +27,7 @@ func TestThrottlingSemaphore(t *testing.T) {
)
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for range 10 {
s.Acquire()
@@ -55,7 +51,7 @@ func TestThrottlingSemaphore(t *testing.T) {
s.Release()
}
}()
})
}
wg.Wait()

View File

@@ -68,11 +68,7 @@ func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (Iter
// start N workers, each fetching from the shared channel and invoking the provided callback.
// cleanup() must be called to for worker completion
for range parallel {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for i := range workch {
if err := originalCallback(i); err != nil {
select {
@@ -81,7 +77,7 @@ func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (Iter
}
}
}
}()
})
}
return callback, cleanup

View File

@@ -1045,11 +1045,7 @@ func (s *contentManagerSuite) TestParallelWrites(t *testing.T) {
// start numWorkers, each writing random block and recording it
for workerID := range numWorkers {
workersWG.Add(1)
go func() {
defer workersWG.Done()
workersWG.Go(func() {
for !stopWorker.Load() {
id := writeContentAndVerify(ctx, t, bm, seededRandomData(rand.Int(), 100))
@@ -1059,7 +1055,7 @@ func (s *contentManagerSuite) TestParallelWrites(t *testing.T) {
workerLock.RUnlock()
}
}()
})
}
flush := func() {
@@ -1128,11 +1124,7 @@ func (s *contentManagerSuite) TestFlushResumesWriters(t *testing.T) {
var writeWG sync.WaitGroup
writeWG.Add(1)
go func() {
defer writeWG.Done()
writeWG.Go(func() {
// start a write while flush is ongoing, the write will block on the condition variable
<-resumeWrites
t.Logf("write started")
@@ -1140,7 +1132,7 @@ func (s *contentManagerSuite) TestFlushResumesWriters(t *testing.T) {
second = writeContentAndVerify(ctx, t, bm, []byte{3, 4, 5})
t.Logf("write finished")
}()
})
// flush will take 5 seconds, 1 second into that we will start a write
bm.Flush(ctx)

View File

@@ -108,11 +108,7 @@ type work struct {
}()
for range parallelFetches {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var tmp gather.WriteBuffer
defer tmp.Close()
@@ -143,7 +139,7 @@ type work struct {
}
}
}
}()
})
}
wg.Wait()

View File

@@ -75,11 +75,7 @@ func (b Builder) sortedContents() []*Info {
numWorkers := runtime.NumCPU()
for worker := range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for i := range buckets {
if i%numWorkers == worker {
buck := buckets[i]
@@ -89,7 +85,7 @@ func (b Builder) sortedContents() []*Info {
})
}
}
}()
})
}
wg.Wait()

View File

@@ -75,11 +75,7 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
var wg sync.WaitGroup
for range opt.Parallel {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for c := range cnt {
if c.err != nil {
failedCount.Add(1)
@@ -136,7 +132,7 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
rewritten.Add(int64(c.PackedLength))
}
}
}()
})
}
wg.Wait()

View File

@@ -1 +1 @@
1.24.x
1.25.x

View File

@@ -1,6 +1,6 @@
module github.com/kopia/kopia/site
go 1.24
go 1.25
require (
github.com/google/docsy v0.7.0 // indirect

View File

@@ -275,11 +275,7 @@ func (v *Verifier) InParallel(ctx context.Context, enqueue func(tw *TreeWalker)
v.fileWorkQueue = make(chan verifyFileWorkItem, v.opts.FileQueueLength)
for range v.opts.Parallelism {
v.workersWG.Add(1)
go func() {
defer v.workersWG.Done()
v.workersWG.Go(func() {
for wi := range v.fileWorkQueue {
if tw.TooManyErrors() {
continue
@@ -289,7 +285,7 @@ func (v *Verifier) InParallel(ctx context.Context, enqueue func(tw *TreeWalker)
tw.ReportError(ctx, wi.entryPath, err)
}
}
}()
})
}
err := enqueue(tw)

View File

@@ -105,11 +105,8 @@ func (e *estimator) StartEstimation(ctx context.Context, cb EstimationDoneFn) {
scanCtx, cancelScan := context.WithCancel(ctx)
e.cancelCtx = cancelScan
e.scanWG.Add(1)
go func() {
defer e.scanWG.Done()
e.scanWG.Go(func() {
logger := estimateLog(ctx)
var filesCount, totalFileSize int64
@@ -147,7 +144,7 @@ func (e *estimator) StartEstimation(ctx context.Context, cb EstimationDoneFn) {
}
cb(filesCount, totalFileSize)
}()
})
}
func (e *estimator) Wait() {

View File

@@ -254,11 +254,8 @@ func killOnCondition(t *testing.T, cmd *exec.Cmd) {
var wg sync.WaitGroup
// Add a WaitGroup counter for the first goroutine
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
// Create a scanner to read from stderrPipe
scanner := bufio.NewScanner(stderrPipe)
scanner.Split(bufio.ScanLines)
@@ -275,7 +272,7 @@ func killOnCondition(t *testing.T, cmd *exec.Cmd) {
break
}
}
}()
})
// Start the command
err = cmd.Start()

View File

@@ -116,11 +116,8 @@ func TestPathLockBasic(t *testing.T) {
var path2Err error
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
lock2, err := pl.Lock(tc.path2)
if err != nil {
path2Err = err
@@ -128,7 +125,7 @@ func TestPathLockBasic(t *testing.T) {
}
lock2.Unlock()
}()
})
// Wait until the internal atomic counter increments.
// That will only happen once the Lock call to path2 executes
@@ -281,11 +278,7 @@ func TestPathLockRace(t *testing.T) {
numGoroutines := 100
for range numGoroutines {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
// Pick from three different path values that should all be
// covered by the same lock.
path := "/some/path/a/b/c"
@@ -305,7 +298,7 @@ func TestPathLockRace(t *testing.T) {
counter++
lock.Unlock()
}()
})
}
wg.Wait()

View File

@@ -305,11 +305,7 @@ func (e *CLITest) Run(tb testing.TB, expectedError bool, args ...string) (stdout
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
scanner := bufio.NewScanner(stdoutReader)
for scanner.Scan() {
if logOutput {
@@ -318,13 +314,9 @@ func (e *CLITest) Run(tb testing.TB, expectedError bool, args ...string) (stdout
stdout = append(stdout, scanner.Text())
}
}()
wg.Add(1)
go func() {
defer wg.Done()
})
wg.Go(func() {
scanner := bufio.NewScanner(stderrReader)
for scanner.Scan() {
if logOutput {
@@ -333,7 +325,7 @@ func (e *CLITest) Run(tb testing.TB, expectedError bool, args ...string) (stdout
stderr = append(stderr, scanner.Text())
}
}()
})
wg.Wait()