Compare commits

..

3 Commits

Author SHA1 Message Date
Jakob Borg
cde867cf74 chore(stdiscosrv): use log/slog
Signed-off-by: Jakob Borg <jakob@kastelo.net>
2025-11-30 11:17:29 +01:00
Jakob Borg
70292b4902 chore(stdiscosrv): larger write buffer
Signed-off-by: Jakob Borg <jakob@kastelo.net>
2025-11-30 11:17:29 +01:00
Jakob Borg
553c02f244 chore(model): refactor context handling for folder type (#10472)
Signed-off-by: Jakob Borg <jakob@kastelo.net>
2025-11-27 20:34:35 +00:00
16 changed files with 396 additions and 461 deletions

View File

@@ -10,7 +10,7 @@ import (
"context"
"fmt"
"io"
"log"
"log/slog"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/thejerf/suture/v4"
@@ -173,7 +173,7 @@ func (s *amqpReceiver) Serve(ctx context.Context) error {
id, err = protocol.DeviceIDFromString(string(rec.Key))
}
if err != nil {
log.Println("Replication device ID:", err)
slog.Warn("Failed to parse replication device ID", "error", err)
replicationRecvsTotal.WithLabelValues("error").Inc()
continue
}

View File

@@ -18,6 +18,7 @@ import (
"fmt"
io "io"
"log"
"log/slog"
"math/rand"
"net"
"net/http"
@@ -93,7 +94,7 @@ func (s *apiSrv) Serve(ctx context.Context) error {
if s.useHTTP {
listener, err := net.Listen("tcp", s.addr)
if err != nil {
log.Println("Listen:", err)
slog.ErrorContext(ctx, "Failed to listen", "error", err)
return err
}
s.listener = listener
@@ -107,7 +108,7 @@ func (s *apiSrv) Serve(ctx context.Context) error {
tlsListener, err := tls.Listen("tcp", s.addr, tlsCfg)
if err != nil {
log.Println("Listen:", err)
slog.ErrorContext(ctx, "Failed to listen", "error", err)
return err
}
s.listener = tlsListener
@@ -132,7 +133,7 @@ func (s *apiSrv) Serve(ctx context.Context) error {
err := srv.Serve(s.listener)
if err != nil {
log.Println("Serve:", err)
slog.ErrorContext(ctx, "Failed to serve", "error", err)
}
return err
}
@@ -151,9 +152,7 @@ func (s *apiSrv) handler(w http.ResponseWriter, req *http.Request) {
reqID := requestID(rand.Int63())
req = req.WithContext(context.WithValue(req.Context(), idKey, reqID))
if debug {
log.Println(reqID, req.Method, req.URL, req.Proto)
}
slog.Debug("Handling request", "id", reqID, "method", req.Method, "url", req.URL, "proto", req.Proto)
remoteAddr := &net.TCPAddr{
IP: nil,
@@ -174,7 +173,7 @@ func (s *apiSrv) handler(w http.ResponseWriter, req *http.Request) {
var err error
remoteAddr, err = net.ResolveTCPAddr("tcp", req.RemoteAddr)
if err != nil {
log.Println("remoteAddr:", err)
slog.Warn("Failed to resolve remote address", "address", req.RemoteAddr, "error", err)
lw.Header().Set("Retry-After", errorRetryAfterString())
http.Error(lw, "Internal Server Error", http.StatusInternalServerError)
apiRequestsTotal.WithLabelValues("no_remote_addr").Inc()
@@ -197,9 +196,7 @@ func (s *apiSrv) handleGET(w http.ResponseWriter, req *http.Request) {
deviceID, err := protocol.DeviceIDFromString(req.URL.Query().Get("device"))
if err != nil {
if debug {
log.Println(reqID, "bad device param:", err)
}
slog.Debug("Request with bad device param", "id", reqID, "error", err)
lookupRequestsTotal.WithLabelValues("bad_request").Inc()
w.Header().Set("Retry-After", errorRetryAfterString())
http.Error(w, "Bad Request", http.StatusBadRequest)
@@ -259,9 +256,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
rawCert, err := certificateBytes(req)
if err != nil {
if debug {
log.Println(reqID, "no certificates:", err)
}
slog.Debug("Request without certificates", "id", reqID, "error", err)
announceRequestsTotal.WithLabelValues("no_certificate").Inc()
w.Header().Set("Retry-After", errorRetryAfterString())
http.Error(w, "Forbidden", http.StatusForbidden)
@@ -270,9 +265,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
var ann announcement
if err := json.NewDecoder(req.Body).Decode(&ann); err != nil {
if debug {
log.Println(reqID, "decode:", err)
}
slog.Debug("Failed to decode request", "id", reqID, "error", err)
announceRequestsTotal.WithLabelValues("bad_request").Inc()
w.Header().Set("Retry-After", errorRetryAfterString())
http.Error(w, "Bad Request", http.StatusBadRequest)
@@ -283,9 +276,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
addresses := fixupAddresses(remoteAddr, ann.Addresses)
if len(addresses) == 0 {
if debug {
log.Println(reqID, "no addresses")
}
slog.Debug("Request without addresses", "id", reqID, "error", err)
announceRequestsTotal.WithLabelValues("bad_request").Inc()
w.Header().Set("Retry-After", errorRetryAfterString())
http.Error(w, "Bad Request", http.StatusBadRequest)
@@ -293,9 +284,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
}
if err := s.handleAnnounce(deviceID, addresses); err != nil {
if debug {
log.Println(reqID, "handle:", err)
}
slog.Debug("Failed to handle request", "id", reqID, "error", err)
announceRequestsTotal.WithLabelValues("internal_error").Inc()
w.Header().Set("Retry-After", errorRetryAfterString())
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
@@ -306,9 +295,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
w.Header().Set("Reannounce-After", reannounceAfterString())
w.WriteHeader(http.StatusNoContent)
if debug {
log.Println(reqID, "announced", deviceID, addresses)
}
slog.Debug("Device announced", "id", reqID, "device", deviceID, "addresses", addresses)
}
func (s *apiSrv) Stop() {

View File

@@ -13,7 +13,7 @@ import (
"encoding/binary"
"errors"
"io"
"log"
"log/slog"
"os"
"path"
"runtime"
@@ -74,24 +74,24 @@ func newInMemoryStore(dir string, flushInterval time.Duration, blobs blob.Store)
// Try to read from blob storage
latestKey, cerr := blobs.LatestKey(context.Background())
if cerr != nil {
log.Println("Error finding database from blob storage:", cerr)
slog.Error("Failed to find database in blob storage", "error", cerr)
return s
}
fd, cerr := os.Create(path.Join(s.dir, "records.db"))
if cerr != nil {
log.Println("Error creating database file:", cerr)
slog.Error("Failed to create database file", "error", cerr)
return s
}
if cerr := blobs.Download(context.Background(), latestKey, fd); cerr != nil {
log.Printf("Error downloading database from blob storage: %v", cerr)
slog.Error("Failed to download database from blob storage", "error", cerr)
}
_ = fd.Close()
nr, err = s.read()
}
if err != nil {
log.Println("Error reading database:", err)
slog.Error("Failed to read database", "error", err)
}
log.Printf("Read %d records from database", nr)
slog.Info("Loaded database", "records", nr)
s.expireAndCalculateStatistics()
return s
}
@@ -153,13 +153,13 @@ loop:
for {
select {
case <-t.C:
log.Println("Calculating statistics")
slog.InfoContext(ctx, "Calculating statistics")
s.expireAndCalculateStatistics()
log.Println("Flushing database")
slog.InfoContext(ctx, "Flushing database")
if err := s.write(); err != nil {
log.Println("Error writing database:", err)
slog.ErrorContext(ctx, "Failed to write database", "error", err)
}
log.Println("Finished flushing database")
slog.InfoContext(ctx, "Finished flushing database")
t.Reset(s.flushInterval)
case <-ctx.Done():
@@ -256,7 +256,7 @@ func (s *inMemoryStore) write() (err error) {
if err != nil {
return err
}
bw := bufio.NewWriter(fd)
bw := bufio.NewWriterSize(fd, 1<<20)
var buf []byte
var rangeErr error
@@ -310,18 +310,24 @@ func (s *inMemoryStore) write() (err error) {
return err
}
if info, err := os.Lstat(dbf); err == nil {
slog.Info("Saved database", "name", dbf, "size", info.Size(), "modtime", info.ModTime())
} else {
slog.Warn("Failed to stat database after save", "error", err)
}
// Upload to blob storage
if s.blobs != nil {
fd, err = os.Open(dbf)
if err != nil {
log.Printf("Error uploading database to blob storage: %v", err)
slog.Error("Failed to upload database to blob storage", "error", err)
return nil
}
defer fd.Close()
if err := s.blobs.Upload(context.Background(), s.objKey, fd); err != nil {
log.Printf("Error uploading database to blob storage: %v", err)
slog.Error("Failed to upload database to blob storage", "error", err)
}
log.Println("Finished uploading database")
slog.Info("Finished uploading database")
}
return nil
@@ -360,7 +366,7 @@ func (s *inMemoryStore) read() (int, error) {
key, err = protocol.DeviceIDFromString(string(rec.Key))
}
if err != nil {
log.Println("Bad device ID:", err)
slog.Error("Got bad device ID while reading database", "error", err)
continue
}

View File

@@ -9,9 +9,8 @@ package main
import (
"context"
"crypto/tls"
"log"
"log/slog"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
@@ -24,6 +23,7 @@ import (
"github.com/syncthing/syncthing/internal/blob"
"github.com/syncthing/syncthing/internal/blob/azureblob"
"github.com/syncthing/syncthing/internal/blob/s3"
"github.com/syncthing/syncthing/internal/slogutil"
_ "github.com/syncthing/syncthing/lib/automaxprocs"
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/protocol"
@@ -32,8 +32,7 @@ import (
)
const (
addressExpiryTime = 2 * time.Hour
databaseStatisticsInterval = 5 * time.Minute
addressExpiryTime = 2 * time.Hour
// Reannounce-After is set to reannounceAfterSeconds +
// random(reannounzeFuzzSeconds), similar for Retry-After
@@ -88,13 +87,16 @@ type CLI struct {
}
func main() {
log.SetOutput(os.Stdout)
var cli CLI
kong.Parse(&cli)
debug = cli.Debug
log.Println(build.LongVersionFor("stdiscosrv"))
level := slog.LevelInfo
if cli.Debug {
level = slog.LevelDebug
}
slogutil.SetDefaultLevel(level)
slog.Info(build.LongVersionFor("stdiscosrv"))
if cli.Version {
return
}
@@ -106,16 +108,18 @@ func main() {
var err error
cert, err = tls.LoadX509KeyPair(cli.Cert, cli.Key)
if os.IsNotExist(err) {
log.Println("Failed to load keypair. Generating one, this might take a while...")
slog.Info("Failed to load keypair. Generating one, this might take a while...")
cert, err = tlsutil.NewCertificate(cli.Cert, cli.Key, "stdiscosrv", 20*365, false)
if err != nil {
log.Fatalln("Failed to generate X509 key pair:", err)
slog.Error("Failed to generate X509 key pair", "error", err)
os.Exit(1)
}
} else if err != nil {
log.Fatalln("Failed to load keypair:", err)
slog.Error("Failed to load keypair", "error", err)
os.Exit(1)
}
devID := protocol.NewDeviceID(cert.Certificate[0])
log.Println("Server device ID is", devID)
slog.Info("Loaded certificate keypair", "deviceID", devID)
}
// Root of the service tree.
@@ -133,7 +137,8 @@ func main() {
blobs, err = azureblob.NewBlobStore(cli.DBAzureBlobAccount, cli.DBAzureBlobKey, cli.DBAzureBlobContainer)
}
if err != nil {
log.Fatalf("Failed to create blob store: %v", err)
slog.Error("Failed to create blob store", "error", err)
os.Exit(1)
}
// Start the database.
@@ -158,7 +163,9 @@ func main() {
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(cli.MetricsListen, mux))
err := http.ListenAndServe(cli.MetricsListen, mux)
slog.Error("Failed to serve", "error", err)
os.Exit(1)
}()
}
@@ -170,7 +177,7 @@ func main() {
signal.Notify(signalChan, os.Interrupt)
go func() {
sig := <-signalChan
log.Printf("Received signal %s; shutting down", sig)
slog.Info("Received signal; shutting down", "signal", sig)
cancel()
}()

8
go.mod
View File

@@ -76,7 +76,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 // indirect
github.com/maxbrunsfeld/counterfeiter/v6 v6.12.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/nxadm/tail v1.4.11 // indirect
@@ -97,10 +97,10 @@ require (
github.com/tklauser/numcpus v0.10.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/mod v0.29.0 // indirect
golang.org/x/mod v0.30.0 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 // indirect
golang.org/x/tools v0.38.0 // indirect
golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54 // indirect
golang.org/x/tools v0.39.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/libc v1.66.3 // indirect
modernc.org/mathutil v1.7.1 // indirect

24
go.sum
View File

@@ -163,8 +163,8 @@ github.com/maruel/panicparse/v2 v2.5.0/go.mod h1:DA2fDiBk63bKfBf4CVZP9gb4fuvzdPb
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 h1:Eaq36EIyJNp7b3qDhjV7jmDVq/yPeW2v4pTqzGbOGB4=
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3/go.mod h1:6KKUoQBZBW6PDXJtNfqeEjPXMj/ITTk+cWK9t9uS5+E=
github.com/maxbrunsfeld/counterfeiter/v6 v6.12.0 h1:aOeI7xAOVdK+R6xbVsZuU9HmCZYmQVmZgPf9xJUd2Sg=
github.com/maxbrunsfeld/counterfeiter/v6 v6.12.0/go.mod h1:0hZWbtfeCYUQeAQdPLUzETiBhUSns7O6LDj9vH88xKA=
github.com/maxmind/geoipupdate/v6 v6.1.0 h1:sdtTHzzQNJlXF5+fd/EoPTucRHyMonYt/Cok8xzzfqA=
github.com/maxmind/geoipupdate/v6 v6.1.0/go.mod h1:cZYCDzfMzTY4v6dKRdV7KTB6SStxtn3yFkiJ1btTGGc=
github.com/miscreant/miscreant.go v0.0.0-20200214223636-26d376326b75 h1:cUVxyR+UfmdEAZGJ8IiKld1O0dbGotEnkMolG5hfMSY=
@@ -187,8 +187,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y=
github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0=
github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A=
github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k=
github.com/oschwald/geoip2-golang v1.13.0 h1:Q44/Ldc703pasJeP5V9+aFSZFmBN7DKHbNsSFzQATJI=
github.com/oschwald/geoip2-golang v1.13.0/go.mod h1:P9zG+54KPEFOliZ29i7SeYZ/GM6tfEL+rgSn03hYuUo=
github.com/oschwald/maxminddb-golang v1.13.1 h1:G3wwjdN9JmIK2o/ermkHM+98oX5fS+k5MbwsmL4MRQE=
@@ -279,8 +279,8 @@ go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko=
go.uber.org/mock v0.5.2/go.mod h1:wLlUxC2vVTPTaE3UD51E0BGOAElKrILxhVSDYQLld5o=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -289,8 +289,8 @@ golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvm
golang.org/x/exp v0.0.0-20250811191247-51f88131bc50 h1:3yiSh9fhy5/RhCSntf4Sy0Tnx50DmMpQ4MQdKKk4yg4=
golang.org/x/exp v0.0.0-20250811191247-51f88131bc50/go.mod h1:rT6SFzZ7oxADUDx58pcaKFTcZ+inxAa9fTrYx/uVYwg=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w=
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -330,8 +330,8 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 h1:LvzTn0GQhWuvKH/kVRS3R3bVAsdQWI7hvfLHGgh9+lU=
golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8/go.mod h1:Pi4ztBfryZoJEkyFTI5/Ocsu2jXyDr6iSdgJiYE/uwE=
golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54 h1:E2/AqCUMZGgd73TQkxUMcMla25GB9i/5HOdLr+uH7Vo=
golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54/go.mod h1:hKdjCMrbv9skySur+Nek8Hd0uJ0GuxJIoIX2payrIdQ=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -345,8 +345,8 @@ golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs=
golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ=
golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@@ -55,8 +55,7 @@ type folder struct {
ignores *ignore.Matcher
mtimefs fs.Filesystem
modTimeWindow time.Duration
ctx context.Context //nolint:containedctx // used internally, only accessible on serve lifetime
done chan struct{} // used externally, accessible regardless of serve
done chan struct{} // used externally, accessible regardless of serve
sl *slog.Logger
scanInterval time.Duration
@@ -94,12 +93,12 @@ type folder struct {
}
type syncRequest struct {
fn func() error
fn func(context.Context) error
err chan error
}
type puller interface {
pull() (bool, error) // true when successful and should not be retried
pull(ctx context.Context) (bool, error) // true when successful and should not be retried
}
func newFolder(model *model, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *semaphore.Semaphore, ver versioner.Versioner) *folder {
@@ -151,10 +150,8 @@ func (f *folder) Serve(ctx context.Context) error {
f.model.foldersRunning.Add(1)
defer f.model.foldersRunning.Add(-1)
f.ctx = ctx
l.Debugln(f, "starting")
defer l.Debugln(f, "exiting")
f.sl.DebugContext(ctx, "Folder starting")
defer f.sl.DebugContext(ctx, "Folder exiting")
defer func() {
f.scanTimer.Stop()
@@ -163,7 +160,7 @@ func (f *folder) Serve(ctx context.Context) error {
}()
if f.FSWatcherEnabled && f.getHealthErrorAndLoadIgnores() == nil {
f.startWatch()
f.startWatch(ctx)
}
// If we're configured to not do version cleanup, or we don't have a
@@ -182,7 +179,7 @@ func (f *folder) Serve(ctx context.Context) error {
var err error
select {
case <-f.ctx.Done():
case <-ctx.Done():
close(f.done)
return nil
@@ -197,16 +194,16 @@ func (f *folder) Serve(ctx context.Context) error {
}
pullTimer.Reset(time.Duration(float64(time.Second) * f.PullerDelayS))
} else {
_, err = f.pull()
_, err = f.pull(ctx)
}
case <-pullTimer.C:
f.setState(FolderIdle)
_, err = f.pull()
_, err = f.pull(ctx)
case <-f.pullFailTimer.C:
var success bool
success, err = f.pull()
success, err = f.pull(ctx)
if (err != nil || !success) && f.pullPause < 60*f.pullBasePause() {
// Back off from retrying to pull
f.pullPause *= 2
@@ -215,46 +212,46 @@ func (f *folder) Serve(ctx context.Context) error {
case <-initialCompleted:
// Initial scan has completed, we should do a pull
initialCompleted = nil // never hit this case again
_, err = f.pull()
_, err = f.pull(ctx)
case <-f.forcedRescanRequested:
err = f.handleForcedRescans()
err = f.handleForcedRescans(ctx)
case <-f.scanTimer.C:
l.Debugln(f, "Scanning due to timer")
err = f.scanTimerFired()
f.sl.DebugContext(ctx, "Scanning due to timer")
err = f.scanTimerFired(ctx)
case req := <-f.doInSyncChan:
l.Debugln(f, "Running something due to request")
err = req.fn()
f.sl.DebugContext(ctx, "Running something due to request")
err = req.fn(ctx)
req.err <- err
case next := <-f.scanDelay:
l.Debugln(f, "Delaying scan")
f.sl.DebugContext(ctx, "Delaying scan")
f.scanTimer.Reset(next)
case <-f.scanScheduled:
l.Debugln(f, "Scan was scheduled")
f.sl.DebugContext(ctx, "Scan was scheduled")
f.scanTimer.Reset(0)
case fsEvents := <-f.watchChan:
l.Debugln(f, "Scan due to watcher")
err = f.scanSubdirs(fsEvents)
f.sl.DebugContext(ctx, "Scan due to watcher")
err = f.scanSubdirs(ctx, fsEvents)
case <-f.restartWatchChan:
l.Debugln(f, "Restart watcher")
err = f.restartWatch()
f.sl.DebugContext(ctx, "Restart watcher")
err = f.restartWatch(ctx)
case <-f.versionCleanupTimer.C:
l.Debugln(f, "Doing version cleanup")
f.versionCleanupTimerFired()
f.sl.DebugContext(ctx, "Doing version cleanup")
f.versionCleanupTimerFired(ctx)
}
if err != nil {
if svcutil.IsFatal(err) {
return err
}
f.setError(err)
f.setError(ctx, err)
}
}
}
@@ -303,12 +300,14 @@ func (*folder) Jobs(_, _ int) ([]string, []string, int) {
func (f *folder) Scan(subdirs []string) error {
<-f.initialScanFinished
return f.doInSync(func() error { return f.scanSubdirs(subdirs) })
return f.doInSync(func(ctx context.Context) error {
return f.scanSubdirs(ctx, subdirs)
})
}
// doInSync allows to run functions synchronously in folder.serve from exported,
// asynchronously called methods.
func (f *folder) doInSync(fn func() error) error {
func (f *folder) doInSync(fn func(context.Context) error) error {
req := syncRequest{
fn: fn,
err: make(chan error, 1),
@@ -329,7 +328,7 @@ func (f *folder) Reschedule() {
// Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4 //nolint:gosec
interval := time.Duration(sleepNanos) * time.Nanosecond
l.Debugln(f, "next rescan in", interval)
f.sl.Debug("Next rescan scheduled", slog.Duration("interval", interval))
f.scanTimer.Reset(interval)
}
@@ -365,7 +364,7 @@ func (f *folder) getHealthErrorWithoutIgnores() error {
return nil
}
func (f *folder) pull() (success bool, err error) {
func (f *folder) pull(ctx context.Context) (success bool, err error) {
f.pullFailTimer.Stop()
select {
case <-f.pullFailTimer.C:
@@ -402,7 +401,7 @@ func (f *folder) pull() (success bool, err error) {
// Abort early (before acquiring a token) if there's a folder error
err = f.getHealthErrorWithoutIgnores()
if err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
f.sl.DebugContext(ctx, "Skipping pull due to folder error", slogutil.Error(err))
return false, err
}
@@ -411,7 +410,7 @@ func (f *folder) pull() (success bool, err error) {
if f.Type != config.FolderTypeSendOnly {
f.setState(FolderSyncWaiting)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
if err := f.ioLimiter.TakeWithContext(ctx, 1); err != nil {
return true, err
}
defer f.ioLimiter.Give(1)
@@ -428,12 +427,12 @@ func (f *folder) pull() (success bool, err error) {
}()
err = f.getHealthErrorAndLoadIgnores()
if err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
f.sl.DebugContext(ctx, "Skipping pull due to folder error", slogutil.Error(err))
return false, err
}
f.setError(nil)
f.setError(ctx, nil)
success, err = f.puller.pull()
success, err = f.puller.pull(ctx)
if success && err == nil {
return true, nil
@@ -441,14 +440,14 @@ func (f *folder) pull() (success bool, err error) {
// Pulling failed, try again later.
delay := f.pullPause + time.Since(startTime)
f.sl.Info("Folder failed to sync, will be retried", slog.String("wait", stringutil.NiceDurationString(delay)))
f.sl.InfoContext(ctx, "Folder failed to sync, will be retried", slog.String("wait", stringutil.NiceDurationString(delay)))
f.pullFailTimer.Reset(delay)
return false, err
}
func (f *folder) scanSubdirs(subDirs []string) error {
l.Debugf("%v scanning", f)
func (f *folder) scanSubdirs(ctx context.Context, subDirs []string) error {
f.sl.DebugContext(ctx, "Scanning")
oldHash := f.ignores.Hash()
@@ -456,14 +455,14 @@ func (f *folder) scanSubdirs(subDirs []string) error {
if err != nil {
return err
}
f.setError(nil)
f.setError(ctx, nil)
// Check on the way out if the ignore patterns changed as part of scanning
// this folder. If they did we should schedule a pull of the folder so that
// we request things we might have suddenly become unignored and so on.
defer func() {
if f.ignores.Hash() != oldHash {
l.Debugln("Folder", f.Description(), "ignore patterns change detected while scanning; triggering puller")
f.sl.DebugContext(ctx, "Ignore patterns change detected while scanning; triggering puller")
f.ignoresUpdated()
f.SchedulePull()
}
@@ -472,15 +471,15 @@ func (f *folder) scanSubdirs(subDirs []string) error {
f.setState(FolderScanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
if err := f.ioLimiter.TakeWithContext(ctx, 1); err != nil {
return err
}
defer f.ioLimiter.Give(1)
metricFolderScans.WithLabelValues(f.ID).Inc()
ctx, cancel := context.WithCancel(f.ctx)
scanCtx, cancel := context.WithCancel(ctx)
defer cancel()
go addTimeUntilCancelled(ctx, metricFolderScanSeconds.WithLabelValues(f.ID))
go addTimeUntilCancelled(scanCtx, metricFolderScanSeconds.WithLabelValues(f.ID))
for i := range subDirs {
sub := osutil.NativeFilename(subDirs[i])
@@ -512,13 +511,13 @@ func (f *folder) scanSubdirs(subDirs []string) error {
// changes.
changes := 0
defer func() {
l.Debugf("%v finished scanning, detected %v changes", f, changes)
f.sl.DebugContext(ctx, "Finished scanning", slog.Int("changes", changes))
if changes > 0 {
f.SchedulePull()
}
}()
changesHere, err := f.scanSubdirsChangedAndNew(subDirs, batch)
changesHere, err := f.scanSubdirsChangedAndNew(ctx, subDirs, batch)
changes += changesHere
if err != nil {
return err
@@ -537,7 +536,7 @@ func (f *folder) scanSubdirs(subDirs []string) error {
// Do a scan of the database for each prefix, to check for deleted and
// ignored files.
changesHere, err = f.scanSubdirsDeletedAndIgnored(subDirs, batch)
changesHere, err = f.scanSubdirsDeletedAndIgnored(ctx, subDirs, batch)
changes += changesHere
if err != nil {
return err
@@ -566,7 +565,7 @@ func (f *folder) newScanBatch() *scanBatch {
}
b.updateBatch = NewFileInfoBatch(func(fs []protocol.FileInfo) error {
if err := b.f.getHealthErrorWithoutIgnores(); err != nil {
l.Debugf("Stopping scan of folder %s due to: %s", b.f.Description(), err)
b.f.sl.Debug("Stopping scan due to folder error", slogutil.Error(err))
return err
}
b.f.updateLocalsFromScanning(fs)
@@ -627,7 +626,7 @@ func (b *scanBatch) Update(fi protocol.FileInfo) (bool, error) {
// Our item is deleted and the global item is our own receive only
// file. No point in keeping track of that.
b.Remove(fi.Name)
l.Debugf("%v scanning: deleting deleted receive-only local-changed file: %v", b.f, fi)
b.f.sl.Debug("Deleting deleted receive-only local-changed file", slogutil.FilePath(fi.Name))
return true, nil
}
case (b.f.Type == config.FolderTypeReceiveOnly || b.f.Type == config.FolderTypeReceiveEncrypted) &&
@@ -640,19 +639,19 @@ func (b *scanBatch) Update(fi protocol.FileInfo) (bool, error) {
IgnoreXattrs: !b.f.SyncXattrs && !b.f.SendXattrs,
}):
// What we have locally is equivalent to the global file.
l.Debugf("%v scanning: Merging identical locally changed item with global: %v", b.f, fi)
b.f.sl.Debug("Merging identical locally changed item with global", slogutil.FilePath(fi.Name))
fi = gf
}
b.updateBatch.Append(fi)
return true, nil
}
func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *scanBatch) (int, error) {
func (f *folder) scanSubdirsChangedAndNew(ctx context.Context, subDirs []string, batch *scanBatch) (int, error) {
changes := 0
// If we return early e.g. due to a folder health error, the scan needs
// to be cancelled.
scanCtx, scanCancel := context.WithCancel(f.ctx)
scanCtx, scanCancel := context.WithCancel(ctx)
defer scanCancel()
scanConfig := scanner.Config{
@@ -706,7 +705,7 @@ func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *scanBatch) (i
switch f.Type {
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
default:
if nf, ok := f.findRename(res.File, alreadyUsedOrExisting); ok {
if nf, ok := f.findRename(ctx, res.File, alreadyUsedOrExisting); ok {
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
@@ -719,7 +718,7 @@ func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *scanBatch) (i
return changes, nil
}
func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *scanBatch) (int, error) {
func (f *folder) scanSubdirsDeletedAndIgnored(ctx context.Context, subDirs []string, batch *scanBatch) (int, error) {
var toIgnore []protocol.FileInfo
ignoredParent := ""
changes := 0
@@ -732,7 +731,7 @@ outer:
}
select {
case <-f.ctx.Done():
case <-ctx.Done():
break outer
default:
}
@@ -743,7 +742,7 @@ outer:
if ignoredParent != "" && !fs.IsParent(fi.Name, ignoredParent) {
for _, file := range toIgnore {
l.Debugln("marking file as ignored", file)
f.sl.DebugContext(ctx, "Marking file as ignored", slogutil.FilePath(file.Name))
nf := file
nf.SetIgnored()
if ok, err := batch.Update(nf); err != nil {
@@ -775,7 +774,7 @@ outer:
continue
}
l.Debugln("marking file as ignored", fi)
f.sl.DebugContext(ctx, "Marking file as ignored", slogutil.FilePath(fi.Name))
nf := fi
nf.SetIgnored()
if ok, err := batch.Update(nf); err != nil {
@@ -810,7 +809,7 @@ outer:
// sure the file gets in sync on the following pull.
nf.Version = protocol.Vector{}
}
l.Debugln("marking file as deleted", nf)
f.sl.DebugContext(ctx, "Marking file as deleted", slogutil.FilePath(nf.Name))
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
@@ -824,13 +823,13 @@ outer:
return 0, err
case !ok:
case gf.IsReceiveOnlyChanged():
l.Debugln("removing deleted, receive-only item that is globally receive-only from db", fi)
f.sl.DebugContext(ctx, "Removing deleted receive-only item that is globally receive-only from db", slogutil.FilePath(fi.Name))
batch.Remove(fi.Name)
changes++
case gf.IsDeleted():
// Our item is deleted and the global item is deleted too. We just
// pretend it is a normal deleted file (nobody cares about that).
l.Debugf("%v scanning: Marking globally deleted item as not locally changed: %v", f, fi.Name)
f.sl.DebugContext(ctx, "Marking globally deleted item as not locally changed", slogutil.FilePath(fi.Name))
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
if ok, err := batch.Update(fi); err != nil {
return 0, err
@@ -842,7 +841,7 @@ outer:
// No need to bump the version for a file that was and is
// deleted and just the folder type/local flags changed.
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
l.Debugln("removing receive-only flag on deleted item", fi)
f.sl.DebugContext(ctx, "Removing receive-only flag on deleted item", slogutil.FilePath(fi.Name))
if ok, err := batch.Update(fi); err != nil {
return 0, err
} else if ok {
@@ -853,14 +852,14 @@ outer:
}
select {
case <-f.ctx.Done():
return changes, f.ctx.Err()
case <-ctx.Done():
return changes, ctx.Err()
default:
}
if len(toIgnore) > 0 {
for _, file := range toIgnore {
l.Debugln("marking file as ignored", file)
f.sl.DebugContext(ctx, "Marking file as ignored", slogutil.FilePath(file.Name))
nf := file
nf.SetIgnored()
if ok, err := batch.Update(nf); err != nil {
@@ -879,7 +878,7 @@ outer:
return changes, nil
}
func (f *folder) findRename(file protocol.FileInfo, alreadyUsedOrExisting map[string]struct{}) (protocol.FileInfo, bool) {
func (f *folder) findRename(ctx context.Context, file protocol.FileInfo, alreadyUsedOrExisting map[string]struct{}) (protocol.FileInfo, bool) {
if len(file.Blocks) == 0 || file.Size == 0 {
return protocol.FileInfo{}, false
}
@@ -894,7 +893,7 @@ loop:
}
select {
case <-f.ctx.Done():
case <-ctx.Done():
break loop
default:
}
@@ -943,16 +942,16 @@ loop:
return nf, found
}
func (f *folder) scanTimerFired() error {
err := f.scanSubdirs(nil)
func (f *folder) scanTimerFired(ctx context.Context) error {
err := f.scanSubdirs(ctx, nil)
select {
case <-f.initialScanFinished:
default:
if err != nil {
f.sl.Error("Failed initial scan", slogutil.Error(err))
f.sl.ErrorContext(ctx, "Failed initial scan", slogutil.Error(err))
} else {
f.sl.Info("Completed initial scan")
f.sl.InfoContext(ctx, "Completed initial scan")
}
close(f.initialScanFinished)
}
@@ -962,19 +961,19 @@ func (f *folder) scanTimerFired() error {
return err
}
func (f *folder) versionCleanupTimerFired() {
func (f *folder) versionCleanupTimerFired(ctx context.Context) {
f.setState(FolderCleanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
if err := f.ioLimiter.TakeWithContext(ctx, 1); err != nil {
return
}
defer f.ioLimiter.Give(1)
f.setState(FolderCleaning)
if err := f.versioner.Clean(f.ctx); err != nil {
f.sl.Warn("Failed to clean versions", slogutil.Error(err))
if err := f.versioner.Clean(ctx); err != nil {
f.sl.WarnContext(ctx, "Failed to clean versions", slogutil.Error(err))
}
f.versionCleanupTimer.Reset(f.versionCleanupInterval)
@@ -1008,21 +1007,21 @@ func (f *folder) scheduleWatchRestart() {
// restartWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) restartWatch() error {
func (f *folder) restartWatch(ctx context.Context) error {
f.stopWatch()
f.startWatch()
return f.scanSubdirs(nil)
f.startWatch(ctx)
return f.scanSubdirs(ctx, nil)
}
// startWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) startWatch() {
ctx, cancel := context.WithCancel(f.ctx)
func (f *folder) startWatch(ctx context.Context) {
watchCtx, cancel := context.WithCancel(ctx)
f.watchMut.Lock()
f.watchChan = make(chan []string)
f.watchCancel = cancel
f.watchMut.Unlock()
go f.monitorWatch(ctx)
go f.monitorWatch(watchCtx)
}
// monitorWatch starts the filesystem watching and retries every minute on failure.
@@ -1066,7 +1065,7 @@ func (f *folder) monitorWatch(ctx context.Context) {
}
lastWatch = time.Now()
watchaggregator.Aggregate(aggrCtx, eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger)
l.Debugln("Started filesystem watcher for folder", f.Description())
f.sl.DebugContext(ctx, "Started filesystem watcher")
case err = <-errChan:
var next time.Duration
if dur := time.Since(lastWatch); dur > pause {
@@ -1148,9 +1147,9 @@ func (f *folder) scanOnWatchErr() {
}
}
func (f *folder) setError(err error) {
func (f *folder) setError(ctx context.Context, err error) {
select {
case <-f.ctx.Done():
case <-ctx.Done():
return
default:
}
@@ -1162,12 +1161,12 @@ func (f *folder) setError(err error) {
if err != nil {
if oldErr == nil {
f.sl.Warn("Error on folder", slogutil.Error(err))
f.sl.WarnContext(ctx, "Error on folder", slogutil.Error(err))
} else {
f.sl.Info("Folder error changed", slogutil.Error(err), slog.Any("previously", oldErr))
f.sl.InfoContext(ctx, "Folder error changed", slogutil.Error(err), slog.Any("previously", oldErr))
}
} else {
f.sl.Info("Folder error cleared")
f.sl.InfoContext(ctx, "Folder error cleared")
f.SchedulePull()
}
@@ -1324,7 +1323,7 @@ func (f *folder) emitDiskChangeEvents(fs []protocol.FileInfo, typeOfEvent events
}
}
func (f *folder) handleForcedRescans() error {
func (f *folder) handleForcedRescans(ctx context.Context) error {
f.forcedRescanPathsMut.Lock()
paths := make([]string, 0, len(f.forcedRescanPaths))
for path := range f.forcedRescanPaths {
@@ -1360,7 +1359,7 @@ func (f *folder) handleForcedRescans() error {
return err
}
return f.scanSubdirs(paths)
return f.scanSubdirs(ctx, paths)
}
// The exists function is expected to return true for all known paths
@@ -1377,7 +1376,7 @@ func unifySubs(dirs []string, exists func(dir string) bool) []string {
for i := 0; i < len(dirs); {
dir, err := fs.Canonicalize(dirs[i])
if err != nil {
l.Debugf("Skipping %v for scan: %s", dirs[i], err)
slog.Debug("Skipping directory for scan", slog.String("dir", dirs[i]), slogutil.Error(err))
dirs = append(dirs[:i], dirs[i+1:]...)
continue
}

View File

@@ -7,6 +7,7 @@
package model
import (
"context"
"fmt"
"slices"
"strings"
@@ -39,8 +40,8 @@ func (f *receiveEncryptedFolder) Revert() {
f.doInSync(f.revert)
}
func (f *receiveEncryptedFolder) revert() error {
f.sl.Info("Reverting unexpected items")
func (f *receiveEncryptedFolder) revert(ctx context.Context) error {
f.sl.InfoContext(ctx, "Reverting unexpected items")
f.setState(FolderScanning)
defer f.setState(FolderIdle)
@@ -84,7 +85,7 @@ func (f *receiveEncryptedFolder) revert() error {
batch.Append(fi)
}
f.revertHandleDirs(dirs)
f.revertHandleDirs(ctx, dirs)
if err := batch.Flush(); err != nil {
return err
@@ -96,13 +97,13 @@ func (f *receiveEncryptedFolder) revert() error {
return nil
}
func (f *receiveEncryptedFolder) revertHandleDirs(dirs []string) {
func (f *receiveEncryptedFolder) revertHandleDirs(ctx context.Context, dirs []string) {
if len(dirs) == 0 {
return
}
scanChan := make(chan string)
go f.pullScannerRoutine(scanChan)
go f.pullScannerRoutine(ctx, scanChan)
defer close(scanChan)
slices.SortFunc(dirs, func(a, b string) int {

View File

@@ -7,6 +7,7 @@
package model
import (
"context"
"slices"
"strings"
"time"
@@ -69,14 +70,14 @@ func (f *receiveOnlyFolder) Revert() {
f.doInSync(f.revert)
}
func (f *receiveOnlyFolder) revert() error {
f.sl.Info("Reverting folder")
func (f *receiveOnlyFolder) revert(ctx context.Context) error {
f.sl.InfoContext(ctx, "Reverting folder")
f.setState(FolderScanning)
defer f.setState(FolderIdle)
scanChan := make(chan string)
go f.pullScannerRoutine(scanChan)
go f.pullScannerRoutine(ctx, scanChan)
defer close(scanChan)
delQueue := &deleteQueue{
@@ -155,7 +156,7 @@ func (f *receiveOnlyFolder) revert() error {
// Handle any queued directories
deleted, err := delQueue.flush()
if err != nil {
f.sl.Warn("Failed to revert directories", slogutil.Error(err))
f.sl.WarnContext(ctx, "Failed to revert directories", slogutil.Error(err))
}
now := time.Now()
for _, dir := range deleted {

View File

@@ -407,8 +407,8 @@ func TestRecvOnlyRemoteUndoChanges(t *testing.T) {
must(t, m.IndexUpdate(conn, &protocol.IndexUpdate{Folder: "ro", Files: files}))
// Ensure the pull to resolve conflicts (content identical) happened
must(t, f.doInSync(func() error {
f.pull()
must(t, f.doInSync(func(ctx context.Context) error {
f.pull(ctx)
return nil
}))
@@ -456,7 +456,7 @@ func TestRecvOnlyRevertOwnID(t *testing.T) {
// Monitor the outcome
sub := f.evLogger.Subscribe(events.LocalIndexUpdated)
defer sub.Unsubscribe()
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
go func() {
defer cancel()
@@ -567,7 +567,7 @@ func setupKnownFiles(t *testing.T, ffs fs.Filesystem, data []byte) []protocol.Fi
if err != nil {
t.Fatal(err)
}
blocks, _ := scanner.Blocks(context.TODO(), bytes.NewReader(data), protocol.BlockSize(int64(len(data))), int64(len(data)), nil)
blocks, _ := scanner.Blocks(t.Context(), bytes.NewReader(data), protocol.BlockSize(int64(len(data))), int64(len(data)), nil)
knownFiles := []protocol.FileInfo{
{
Name: "knownDir",

View File

@@ -7,6 +7,8 @@
package model
import (
"context"
"github.com/syncthing/syncthing/internal/itererr"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
@@ -37,7 +39,7 @@ func (*sendOnlyFolder) PullErrors() []FileError {
}
// pull checks need for files that only differ by metadata (no changes on disk)
func (f *sendOnlyFolder) pull() (bool, error) {
func (f *sendOnlyFolder) pull(ctx context.Context) (bool, error) {
batch := NewFileInfoBatch(func(files []protocol.FileInfo) error {
f.updateLocalsFromPulling(files)
return nil
@@ -92,8 +94,8 @@ func (f *sendOnlyFolder) Override() {
f.doInSync(f.override)
}
func (f *sendOnlyFolder) override() error {
f.sl.Info("Overriding global state ")
func (f *sendOnlyFolder) override(ctx context.Context) error {
f.sl.InfoContext(ctx, "Overriding global state ")
f.setState(FolderScanning)
defer f.setState(FolderIdle)

View File

@@ -161,20 +161,20 @@ func newSendReceiveFolder(model *model, ignores *ignore.Matcher, cfg config.Fold
// pull returns true if it manages to get all needed items from peers, i.e. get
// the device in sync with the global state.
func (f *sendReceiveFolder) pull() (bool, error) {
l.Debugf("%v pulling", f)
func (f *sendReceiveFolder) pull(ctx context.Context) (bool, error) {
f.sl.DebugContext(ctx, "Pulling")
scanChan := make(chan string)
go f.pullScannerRoutine(scanChan)
go f.pullScannerRoutine(ctx, scanChan)
defer func() {
close(scanChan)
f.setState(FolderIdle)
}()
metricFolderPulls.WithLabelValues(f.ID).Inc()
ctx, cancel := context.WithCancel(f.ctx)
pullCtx, cancel := context.WithCancel(ctx)
defer cancel()
go addTimeUntilCancelled(ctx, metricFolderPullSeconds.WithLabelValues(f.ID))
go addTimeUntilCancelled(pullCtx, metricFolderPullSeconds.WithLabelValues(f.ID))
changed := 0
@@ -185,8 +185,8 @@ func (f *sendReceiveFolder) pull() (bool, error) {
var err error
for tries := range maxPullerIterations {
select {
case <-f.ctx.Done():
return false, f.ctx.Err()
case <-ctx.Done():
return false, ctx.Err()
default:
}
@@ -194,12 +194,12 @@ func (f *sendReceiveFolder) pull() (bool, error) {
// it to FolderSyncing during the last iteration.
f.setState(FolderSyncPreparing)
changed, err = f.pullerIteration(scanChan)
changed, err = f.pullerIteration(ctx, scanChan)
if err != nil {
return false, err
}
l.Debugln(f, "changed", changed, "on try", tries+1)
f.sl.DebugContext(ctx, "Pull iteration completed", "changed", changed, "try", tries+1)
if changed == 0 {
// No files were changed by the puller, so we are in
@@ -214,7 +214,7 @@ func (f *sendReceiveFolder) pull() (bool, error) {
if pullErrNum > 0 {
f.pullErrors = make([]FileError, 0, len(f.tempPullErrors))
for path, err := range f.tempPullErrors {
f.sl.Warn("Failed to sync", slogutil.FilePath(path), slogutil.Error(err))
f.sl.WarnContext(ctx, "Failed to sync", slogutil.FilePath(path), slogutil.Error(err))
f.pullErrors = append(f.pullErrors, FileError{
Err: err,
Path: path,
@@ -238,7 +238,7 @@ func (f *sendReceiveFolder) pull() (bool, error) {
// returns the number items that should have been synced (even those that
// might have failed). One puller iteration handles all files currently
// flagged as needed in the folder.
func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error) {
func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<- string) (int, error) {
f.errorsMut.Lock()
f.tempPullErrors = make(map[string]string)
f.errorsMut.Unlock()
@@ -253,7 +253,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
var doneWg sync.WaitGroup
var updateWg sync.WaitGroup
l.Debugln(f, "copiers:", f.Copiers, "pullerPendingKiB:", f.PullerMaxPendingKiB)
f.sl.DebugContext(ctx, "Starting puller iteration", "copiers", f.Copiers, "pullerPendingKiB", f.PullerMaxPendingKiB)
updateWg.Add(1)
go func() {
@@ -266,7 +266,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
copyWg.Add(1)
go func() {
// copierRoutine finishes when copyChan is closed
f.copierRoutine(copyChan, pullChan, finisherChan)
f.copierRoutine(ctx, copyChan, pullChan, finisherChan)
copyWg.Done()
}()
}
@@ -274,18 +274,18 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
pullWg.Add(1)
go func() {
// pullerRoutine finishes when pullChan is closed
f.pullerRoutine(pullChan, finisherChan)
f.pullerRoutine(ctx, pullChan, finisherChan)
pullWg.Done()
}()
doneWg.Add(1)
// finisherRoutine finishes when finisherChan is closed
go func() {
f.finisherRoutine(finisherChan, dbUpdateChan, scanChan)
f.finisherRoutine(ctx, finisherChan, dbUpdateChan, scanChan)
doneWg.Done()
}()
changed, fileDeletions, dirDeletions, err := f.processNeeded(dbUpdateChan, copyChan, scanChan)
changed, fileDeletions, dirDeletions, err := f.processNeeded(ctx, dbUpdateChan, copyChan, scanChan)
// Signal copy and puller routines that we are done with the in data for
// this iteration. Wait for them to finish.
@@ -300,7 +300,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
doneWg.Wait()
if err == nil {
f.processDeletions(fileDeletions, dirDeletions, dbUpdateChan, scanChan)
f.processDeletions(ctx, fileDeletions, dirDeletions, dbUpdateChan, scanChan)
}
// Wait for db updates and scan scheduling to complete
@@ -312,7 +312,7 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) (int, error)
return changed, err
}
func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) {
func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) {
changed := 0
var dirDeletions []protocol.FileInfo
fileDeletions := map[string]protocol.FileInfo{}
@@ -328,13 +328,13 @@ loop:
return changed, nil, nil, err
}
select {
case <-f.ctx.Done():
case <-ctx.Done():
break loop
default:
}
if f.IgnoreDelete && file.IsDeleted() {
l.Debugln(f, "ignore file deletion (config)", file.FileName())
f.sl.DebugContext(ctx, "Ignoring file deletion per config", slogutil.FilePath(file.FileName()))
continue
}
@@ -343,7 +343,7 @@ loop:
switch {
case f.ignores.Match(file.Name).IsIgnored():
file.SetIgnored()
l.Debugln(f, "Handling ignored file", file)
f.sl.DebugContext(ctx, "Handling ignored file", file.LogAttr())
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
case build.IsWindows && fs.WindowsInvalidFilename(file.Name) != nil:
@@ -409,17 +409,17 @@ loop:
break
}
file.SetUnsupported()
l.Debugln(f, "Invalidating symlink (unsupported)", file.Name)
f.sl.DebugContext(ctx, "Invalidating unsupported symlink", slogutil.FilePath(file.Name))
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
case file.IsDirectory() && !file.IsSymlink():
l.Debugln(f, "Handling directory", file.Name)
f.sl.DebugContext(ctx, "Handling directory", slogutil.FilePath(file.Name))
if f.checkParent(file.Name, scanChan) {
f.handleDir(file, dbUpdateChan, scanChan)
}
case file.IsSymlink():
l.Debugln(f, "Handling symlink", file.Name)
f.sl.DebugContext(ctx, "Handling symlink", slogutil.FilePath(file.Name))
if f.checkParent(file.Name, scanChan) {
f.handleSymlink(file, dbUpdateChan, scanChan)
}
@@ -430,8 +430,8 @@ loop:
}
select {
case <-f.ctx.Done():
return changed, nil, nil, f.ctx.Err()
case <-ctx.Done():
return changed, nil, nil, ctx.Err()
default:
}
@@ -440,8 +440,8 @@ loop:
nextFile:
for {
select {
case <-f.ctx.Done():
return changed, fileDeletions, dirDeletions, f.ctx.Err()
case <-ctx.Done():
return changed, fileDeletions, dirDeletions, ctx.Err()
default:
}
@@ -481,7 +481,7 @@ nextFile:
// map.
desired := fileDeletions[candidate.Name]
if err := f.renameFile(candidate, desired, fi, dbUpdateChan, scanChan); err != nil {
l.Debugf("rename shortcut for %s failed: %s", fi.Name, err.Error())
f.sl.DebugContext(ctx, "Rename shortcut failed", slogutil.FilePath(fi.Name), slogutil.Error(err))
// Failed to rename, try next one.
continue
}
@@ -510,7 +510,7 @@ nextFile:
continue
}
if err := f.handleFile(fi, copyChan); err != nil {
if err := f.handleFile(ctx, fi, copyChan); err != nil {
f.newPullError(fileName, err)
}
}
@@ -528,10 +528,10 @@ func popCandidate(buckets map[string][]protocol.FileInfo, key string) (protocol.
return cands[0], true
}
func (f *sendReceiveFolder) processDeletions(fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
func (f *sendReceiveFolder) processDeletions(ctx context.Context, fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
for _, file := range fileDeletions {
select {
case <-f.ctx.Done():
case <-ctx.Done():
return
default:
}
@@ -542,13 +542,13 @@ func (f *sendReceiveFolder) processDeletions(fileDeletions map[string]protocol.F
// Process in reverse order to delete depth first
for i := range dirDeletions {
select {
case <-f.ctx.Done():
case <-ctx.Done():
return
default:
}
dir := dirDeletions[len(dirDeletions)-i-1]
l.Debugln(f, "Deleting dir", dir.Name)
f.sl.DebugContext(ctx, "Deleting directory", slogutil.FilePath(dir.Name))
f.deleteDir(dir, dbUpdateChan, scanChan)
}
}
@@ -708,10 +708,10 @@ func (f *sendReceiveFolder) checkParent(file string, scanChan chan<- string) boo
// Encrypted files have made-up filenames with two synthetic parent
// directories which don't have any meaning. Create those if necessary.
if _, err := f.mtimefs.Lstat(parent); !fs.IsNotExist(err) {
l.Debugf("%v parent not missing %v", f, file)
f.sl.Debug("Parent directory exists", slogutil.FilePath(file))
return true
}
l.Debugf("%v creating parent directory of %v", f, file)
f.sl.Debug("Creating parent directory", slogutil.FilePath(file))
if err := f.mtimefs.MkdirAll(parent, 0o755); err != nil {
f.newPullError(file, fmt.Errorf("creating parent dir: %w", err))
return false
@@ -880,7 +880,7 @@ func (f *sendReceiveFolder) deleteFileWithCurrent(file, cur protocol.FileInfo, h
// care not declare another err.
var err error
l.Debugln(f, "Deleting file or symlink", file.Name)
f.sl.Debug("Deleting file or symlink", slogutil.FilePath(file.Name))
f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
@@ -1001,7 +1001,7 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, db
})
}()
l.Debugln(f, "taking rename shortcut", source.Name, "->", target.Name)
f.sl.Debug("Taking rename shortcut", "from", source.Name, "to", target.Name)
// Check that source is compatible with what we have in the DB
if err = f.checkToBeDeleted(source, cur, true, scanChan); err != nil {
@@ -1130,7 +1130,7 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, db
// handleFile queues the copies and pulls as necessary for a single new or
// changed file.
func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState) error {
func (f *sendReceiveFolder) handleFile(ctx context.Context, file protocol.FileInfo, copyChan chan<- copyBlocksState) error {
curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
if err != nil {
return err
@@ -1146,7 +1146,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
reused := make([]int, 0, len(file.Blocks))
if f.Type != config.FolderTypeReceiveEncrypted {
blocks, reused = f.reuseBlocks(blocks, reused, file, tempName)
blocks, reused = f.reuseBlocks(ctx, blocks, reused, file, tempName)
}
// The sharedpullerstate will know which flags to use when opening the
@@ -1170,7 +1170,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
s := newSharedPullerState(file, f.mtimefs, f.folderID, tempName, blocks, reused, f.IgnorePerms || file.NoPermissions, hasCurFile, curFile, !f.DisableSparseFiles, !f.DisableFsync)
l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), len(reused))
f.sl.DebugContext(ctx, "Handling file", slogutil.FilePath(file.Name), "blocksToCopy", len(blocks), "reused", len(reused))
cs := copyBlocksState{
sharedPullerState: s,
@@ -1181,15 +1181,15 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
return nil
}
func (f *sendReceiveFolder) reuseBlocks(blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) {
func (f *sendReceiveFolder) reuseBlocks(ctx context.Context, blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) {
// Check for an old temporary file which might have some blocks we could
// reuse.
tempBlocks, err := scanner.HashFile(f.ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
tempBlocks, err := scanner.HashFile(ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
if err != nil {
var caseErr *fs.CaseConflictError
if errors.As(err, &caseErr) {
if rerr := f.mtimefs.Rename(caseErr.Real, tempName); rerr == nil {
tempBlocks, err = scanner.HashFile(f.ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
tempBlocks, err = scanner.HashFile(ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
}
}
}
@@ -1262,7 +1262,7 @@ func populateOffsets(blocks []protocol.BlockInfo) {
// shortcutFile sets file metadata, when that's the only thing that has
// changed.
func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
l.Debugln(f, "taking shortcut on", file.Name)
f.sl.Debug("Taking metadata shortcut", slogutil.FilePath(file.Name))
f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
@@ -1330,7 +1330,7 @@ func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan ch
// copierRoutine reads copierStates until the in channel closes and performs
// the relevant copies when possible, or passes it to the puller routine.
func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
func (f *sendReceiveFolder) copierRoutine(ctx context.Context, in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
otherFolderFilesystems := make(map[string]fs.Filesystem)
for folder, cfg := range f.model.cfg.Folders() {
if folder == f.ID {
@@ -1349,8 +1349,8 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
blocks:
for _, block := range state.blocks {
select {
case <-f.ctx.Done():
state.fail(fmt.Errorf("folder stopped: %w", f.ctx.Err()))
case <-ctx.Done():
state.fail(fmt.Errorf("folder stopped: %w", ctx.Err()))
break blocks
default:
}
@@ -1368,7 +1368,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
continue
}
if f.copyBlock(block, state, otherFolderFilesystems) {
if f.copyBlock(ctx, block, state, otherFolderFilesystems) {
state.copyDone(block)
continue
}
@@ -1397,13 +1397,13 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
}
// Returns true when the block was successfully copied.
func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem) bool {
func (f *sendReceiveFolder) copyBlock(ctx context.Context, block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem) bool {
buf := protocol.BufferPool.Get(block.Size)
defer protocol.BufferPool.Put(buf)
// Hope that it's usually in the same folder, so start with that
// one. Also possibly more efficient copy (same filesystem).
if f.copyBlockFromFolder(f.ID, block, state, f.mtimefs, buf) {
if f.copyBlockFromFolder(ctx, f.ID, block, state, f.mtimefs, buf) {
return true
}
if state.failed() != nil {
@@ -1411,7 +1411,7 @@ func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocks
}
for folderID, ffs := range otherFolderFilesystems {
if f.copyBlockFromFolder(folderID, block, state, ffs, buf) {
if f.copyBlockFromFolder(ctx, folderID, block, state, ffs, buf) {
return true
}
if state.failed() != nil {
@@ -1424,17 +1424,17 @@ func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocks
// Returns true when the block was successfully copied.
// The passed buffer must be large enough to accommodate the block.
func (f *sendReceiveFolder) copyBlockFromFolder(folderID string, block protocol.BlockInfo, state copyBlocksState, ffs fs.Filesystem, buf []byte) bool {
func (f *sendReceiveFolder) copyBlockFromFolder(ctx context.Context, folderID string, block protocol.BlockInfo, state copyBlocksState, ffs fs.Filesystem, buf []byte) bool {
for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(folderID, block.Hash)) {
if err != nil {
// We just ignore this and continue pulling instead (though
// there's a good chance that will fail too, if the DB is
// unhealthy).
l.Debugf("Failed to get information from DB about block %v in copier (folderID %v, file %v): %v", block.Hash, f.folderID, state.file.Name, err)
f.sl.DebugContext(ctx, "Failed to get block information from database", "blockHash", block.Hash, slogutil.FilePath(state.file.Name), slogutil.Error(err))
return false
}
if !f.copyBlockFromFile(e.FileName, e.Offset, state, ffs, block, buf) {
if !f.copyBlockFromFile(ctx, e.FileName, e.Offset, state, ffs, block, buf) {
if state.failed() != nil {
return false
}
@@ -1454,17 +1454,17 @@ func (f *sendReceiveFolder) copyBlockFromFolder(folderID string, block protocol.
// Returns true when the block was successfully copied.
// The passed buffer must be large enough to accommodate the block.
func (f *sendReceiveFolder) copyBlockFromFile(srcName string, srcOffset int64, state copyBlocksState, ffs fs.Filesystem, block protocol.BlockInfo, buf []byte) bool {
func (f *sendReceiveFolder) copyBlockFromFile(ctx context.Context, srcName string, srcOffset int64, state copyBlocksState, ffs fs.Filesystem, block protocol.BlockInfo, buf []byte) bool {
fd, err := ffs.Open(srcName)
if err != nil {
l.Debugf("Failed to open file %v trying to copy block %v (folderID %v): %v", srcName, block.Hash, f.folderID, err)
f.sl.DebugContext(ctx, "Failed to open source file for block copy", slogutil.FilePath(srcName), "blockHash", block.Hash, slogutil.Error(err))
return false
}
defer fd.Close()
_, err = fd.ReadAt(buf, srcOffset)
if err != nil {
l.Debugf("Failed to read block from file %v in copier (folderID: %v, hash: %v): %v", srcName, f.folderID, block.Hash, err)
f.sl.DebugContext(ctx, "Failed to read block from file", slogutil.FilePath(srcName), "blockHash", block.Hash, slogutil.Error(err))
return false
}
@@ -1473,7 +1473,7 @@ func (f *sendReceiveFolder) copyBlockFromFile(srcName string, srcOffset int64, s
// trust. (The other side can and will verify.)
if f.Type != config.FolderTypeReceiveEncrypted {
if err := f.verifyBuffer(buf, block); err != nil {
l.Debugf("Failed to verify buffer in copier (folderID: %v): %v", f.folderID, err)
f.sl.DebugContext(ctx, "Failed to verify block buffer", slogutil.Error(err))
return false
}
}
@@ -1485,13 +1485,13 @@ func (f *sendReceiveFolder) copyBlockFromFile(srcName string, srcOffset int64, s
}
if f.CopyRangeMethod != config.CopyRangeMethodStandard {
err = f.withLimiter(func() error {
err = f.withLimiter(ctx, func() error {
dstFd.mut.Lock()
defer dstFd.mut.Unlock()
return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, srcOffset, block.Offset, int64(block.Size))
})
} else {
err = f.limitedWriteAt(dstFd, buf, block.Offset)
err = f.limitedWriteAt(ctx, dstFd, buf, block.Offset)
}
if err != nil {
state.fail(fmt.Errorf("dst write: %w", err))
@@ -1513,7 +1513,7 @@ func (*sendReceiveFolder) verifyBuffer(buf []byte, block protocol.BlockInfo) err
return nil
}
func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
func (f *sendReceiveFolder) pullerRoutine(ctx context.Context, in <-chan pullBlockState, out chan<- *sharedPullerState) {
requestLimiter := semaphore.New(f.PullerMaxPendingKiB * 1024)
var wg sync.WaitGroup
@@ -1531,7 +1531,7 @@ func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *
bytes := state.block.Size
if err := requestLimiter.TakeWithContext(f.ctx, bytes); err != nil {
if err := requestLimiter.TakeWithContext(ctx, bytes); err != nil {
state.fail(err)
out <- state.sharedPullerState
continue
@@ -1543,13 +1543,13 @@ func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *
defer wg.Done()
defer requestLimiter.Give(bytes)
f.pullBlock(state, out)
f.pullBlock(ctx, state, out)
}()
}
wg.Wait()
}
func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPullerState) {
func (f *sendReceiveFolder) pullBlock(ctx context.Context, state pullBlockState, out chan<- *sharedPullerState) {
// Get an fd to the temporary file. Technically we don't need it until
// after fetching the block, but if we run into an error here there is
// no point in issuing the request to the network.
@@ -1572,8 +1572,8 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu
loop:
for {
select {
case <-f.ctx.Done():
state.fail(fmt.Errorf("folder stopped: %w", f.ctx.Err()))
case <-ctx.Done():
state.fail(fmt.Errorf("folder stopped: %w", ctx.Err()))
break loop
default:
}
@@ -1600,10 +1600,10 @@ loop:
activity.using(selected)
var buf []byte
blockNo := int(state.block.Offset / int64(state.file.BlockSize()))
buf, lastError = f.model.RequestGlobal(f.ctx, selected.ID, f.folderID, state.file.Name, blockNo, state.block.Offset, state.block.Size, state.block.Hash, selected.FromTemporary)
buf, lastError = f.model.RequestGlobal(ctx, selected.ID, f.folderID, state.file.Name, blockNo, state.block.Offset, state.block.Size, state.block.Hash, selected.FromTemporary)
activity.done(selected)
if lastError != nil {
l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, selected.ID.Short(), "returned error:", lastError)
f.sl.DebugContext(ctx, "Block request returned error", slogutil.FilePath(state.file.Name), "offset", state.block.Offset, "size", state.block.Size, "device", selected.ID.Short(), slogutil.Error(lastError))
continue
}
@@ -1617,12 +1617,12 @@ loop:
lastError = f.verifyBuffer(buf, state.block)
}
if lastError != nil {
l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
f.sl.DebugContext(ctx, "Block hash mismatch", slogutil.FilePath(state.file.Name), "offset", state.block.Offset, "size", state.block.Size)
continue
}
// Save the block data we got from the cluster
err = f.limitedWriteAt(fd, buf, state.block.Offset)
err = f.limitedWriteAt(ctx, fd, buf, state.block.Offset)
if err != nil {
state.fail(fmt.Errorf("save: %w", err))
} else {
@@ -1687,10 +1687,10 @@ func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCu
return nil
}
func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
func (f *sendReceiveFolder) finisherRoutine(ctx context.Context, in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
for state := range in {
if closed, err := state.finalClose(); closed {
l.Debugln(f, "closing", state.file.Name)
f.sl.DebugContext(ctx, "Closing temp file", slogutil.FilePath(state.file.Name))
f.queue.Done(state.file.Name)
@@ -1701,7 +1701,7 @@ func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState, dbUpda
if err != nil {
f.newPullError(state.file.Name, fmt.Errorf("finishing: %w", err))
} else {
slog.Info("Synced file", f.LogAttr(), state.file.LogAttr(), slog.Group("blocks", slog.Int("local", state.reused+state.copyTotal), slog.Int("download", state.pullTotal)))
slog.InfoContext(ctx, "Synced file", f.LogAttr(), state.file.LogAttr(), slog.Group("blocks", slog.Int("local", state.reused+state.copyTotal), slog.Int("download", state.pullTotal)))
minBlocksPerBlock := state.file.BlockSize() / protocol.MinBlockSize
blockStatsMut.Lock()
@@ -1756,11 +1756,11 @@ func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) {
if !f.DisableFsync {
fd, err := f.mtimefs.Open(dir)
if err != nil {
l.Debugf("fsync %q failed: %v", dir, err)
f.sl.Debug("Fsync failed", slogutil.FilePath(dir), slogutil.Error(err))
continue
}
if err := fd.Sync(); err != nil {
l.Debugf("fsync %q failed: %v", dir, err)
f.sl.Debug("Fsync failed", slogutil.FilePath(dir), slogutil.Error(err))
}
fd.Close()
}
@@ -1832,7 +1832,7 @@ loop:
// pullScannerRoutine aggregates paths to be scanned after pulling. The scan is
// scheduled once when scanChan is closed (scanning can not happen during pulling).
func (f *sendReceiveFolder) pullScannerRoutine(scanChan <-chan string) {
func (f *sendReceiveFolder) pullScannerRoutine(ctx context.Context, scanChan <-chan string) {
toBeScanned := make(map[string]struct{})
for path := range scanChan {
@@ -1842,7 +1842,7 @@ func (f *sendReceiveFolder) pullScannerRoutine(scanChan <-chan string) {
if len(toBeScanned) != 0 {
scanList := make([]string, 0, len(toBeScanned))
for path := range toBeScanned {
l.Debugln(f, "scheduling scan after pulling for", path)
slog.DebugContext(ctx, "Scheduling scan after pulling", slogutil.FilePath(path))
scanList = append(scanList, path)
}
f.Scan(scanList)
@@ -1883,7 +1883,7 @@ func (f *sendReceiveFolder) moveForConflict(name, lastModBy string, scanChan cha
})
for _, match := range matches[f.MaxConflicts:] {
if gerr := f.mtimefs.Remove(match); gerr != nil {
l.Debugln(f, "removing extra conflict", gerr)
f.sl.Debug("Failed to remove extra conflict copy", slogutil.Error(gerr))
}
}
}
@@ -1895,7 +1895,7 @@ func (f *sendReceiveFolder) moveForConflict(name, lastModBy string, scanChan cha
}
func (f *sendReceiveFolder) newPullError(path string, err error) {
if errors.Is(err, f.ctx.Err()) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Error because the folder stopped - no point logging/tracking
return
}
@@ -1916,7 +1916,7 @@ func (f *sendReceiveFolder) newPullError(path string, err error) {
errStr := fmt.Sprintf("syncing: %s", err)
f.tempPullErrors[path] = errStr
l.Debugf("%v new error for %v: %v", f, path, err)
f.sl.Debug("New pull error", slogutil.FilePath(path), slogutil.Error(err))
}
// deleteItemOnDisk deletes the file represented by old that is about to be replaced by new.
@@ -2116,7 +2116,7 @@ func (f *sendReceiveFolder) scanIfItemChanged(name string, stat fs.FileInfo, ite
// I.e. non-nil error status means "Do not delete!" or "is already deleted".
func (f *sendReceiveFolder) checkToBeDeleted(file, cur protocol.FileInfo, hasCur bool, scanChan chan<- string) error {
if err := osutil.TraversesSymlink(f.mtimefs, filepath.Dir(file.Name)); err != nil {
l.Debugln(f, "not deleting item behind symlink on disk, but update db", file.Name)
f.sl.Debug("Not deleting item behind symlink on disk, but updating database", slogutil.FilePath(file.Name))
return fs.ErrNotExist
}
@@ -2130,7 +2130,7 @@ func (f *sendReceiveFolder) checkToBeDeleted(file, cur protocol.FileInfo, hasCur
scanChan <- file.Name
return errModified
}
l.Debugln(f, "not deleting item we don't have, but update db", file.Name)
f.sl.Debug("Not deleting item we don't have, but updating database", slogutil.FilePath(file.Name))
return err
}
@@ -2144,7 +2144,7 @@ func (f *sendReceiveFolder) setPlatformData(file *protocol.FileInfo, name string
if f.SyncXattrs {
// Set extended attributes.
if err := f.mtimefs.SetXattr(name, file.Platform.Xattrs(), f.XattrFilter); errors.Is(err, fs.ErrXattrsNotSupported) {
l.Debugf("Cannot set xattrs on %q: %v", file.Name, err)
f.sl.Debug("Cannot set xattrs (not supported)", slogutil.FilePath(file.Name), slogutil.Error(err))
} else if err != nil {
return err
}
@@ -2185,15 +2185,15 @@ func (f *sendReceiveFolder) inWritableDir(fn func(string) error, path string) er
return inWritableDir(fn, f.mtimefs, path, f.IgnorePerms)
}
func (f *sendReceiveFolder) limitedWriteAt(fd io.WriterAt, data []byte, offset int64) error {
return f.withLimiter(func() error {
func (f *sendReceiveFolder) limitedWriteAt(ctx context.Context, fd io.WriterAt, data []byte, offset int64) error {
return f.withLimiter(ctx, func() error {
_, err := fd.WriteAt(data, offset)
return err
})
}
func (f *sendReceiveFolder) withLimiter(fn func() error) error {
if err := f.writeLimiter.TakeWithContext(f.ctx, 1); err != nil {
func (f *sendReceiveFolder) withLimiter(ctx context.Context, fn func() error) error {
if err := f.writeLimiter.TakeWithContext(ctx, 1); err != nil {
return err
}
defer f.writeLimiter.Give(1)
@@ -2235,7 +2235,7 @@ func existingConflicts(name string, fs fs.Filesystem) []string {
ext := filepath.Ext(name)
matches, err := fs.Glob(name[:len(name)-len(ext)] + ".sync-conflict-????????-??????*" + ext)
if err != nil {
l.Debugln("globbing for conflicts", err)
slog.Debug("Globbing for conflicts failed", slogutil.Error(err))
}
return matches
}

View File

@@ -65,8 +65,6 @@ func prepareTmpFile(to fs.Filesystem) (string, error) {
return tmpName, nil
}
var folders = []string{"default"}
var diffTestData = []struct {
a string
b string
@@ -112,8 +110,8 @@ func createEmptyFileInfo(t *testing.T, name string, fs fs.Filesystem) protocol.F
}
// Sets up a folder and model, but makes sure the services aren't actually running.
func setupSendReceiveFolder(t testing.TB, files ...protocol.FileInfo) (*testModel, *sendReceiveFolder, context.CancelFunc) {
w, fcfg, wCancel := newDefaultCfgWrapper()
func setupSendReceiveFolder(t testing.TB, files ...protocol.FileInfo) (*testModel, *sendReceiveFolder) {
w, fcfg := newDefaultCfgWrapper(t)
// Initialise model and stop immediately.
model := setupModel(t, w)
model.cancel()
@@ -121,14 +119,13 @@ func setupSendReceiveFolder(t testing.TB, files ...protocol.FileInfo) (*testMode
r, _ := model.folderRunners.Get(fcfg.ID)
f := r.(*sendReceiveFolder)
f.tempPullErrors = make(map[string]string)
f.ctx = context.Background()
// Update index
if files != nil {
f.updateLocalsFromScanning(files)
}
return model, f, wCancel
return model, f
}
// Layout of the files: (indexes from the above array)
@@ -146,12 +143,11 @@ func TestHandleFile(t *testing.T) {
requiredFile := existingFile
requiredFile.Blocks = blocks[1:]
_, f, wcfgCancel := setupSendReceiveFolder(t, existingFile)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t, existingFile)
copyChan := make(chan copyBlocksState, 1)
f.handleFile(requiredFile, copyChan)
f.handleFile(t.Context(), requiredFile, copyChan)
// Receive the results
toCopy := <-copyChan
@@ -188,8 +184,7 @@ func TestHandleFileWithTemp(t *testing.T) {
requiredFile := existingFile
requiredFile.Blocks = blocks[1:]
_, f, wcfgCancel := setupSendReceiveFolder(t, existingFile)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t, existingFile)
if _, err := prepareTmpFile(f.Filesystem()); err != nil {
t.Fatal(err)
@@ -197,7 +192,7 @@ func TestHandleFileWithTemp(t *testing.T) {
copyChan := make(chan copyBlocksState, 1)
f.handleFile(requiredFile, copyChan)
f.handleFile(t.Context(), requiredFile, copyChan)
// Receive the results
toCopy := <-copyChan
@@ -238,8 +233,7 @@ func TestCopierFinder(t *testing.T) {
requiredFile.Blocks = blocks[1:]
requiredFile.Name = "file2"
_, f, wcfgCancel := setupSendReceiveFolder(t, existingFile)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t, existingFile)
if _, err := prepareTmpFile(f.Filesystem()); err != nil {
t.Fatal(err)
@@ -250,10 +244,10 @@ func TestCopierFinder(t *testing.T) {
finisherChan := make(chan *sharedPullerState, 1)
// Run a single fetcher routine
go f.copierRoutine(copyChan, pullChan, finisherChan)
go f.copierRoutine(t.Context(), copyChan, pullChan, finisherChan)
defer close(copyChan)
f.handleFile(requiredFile, copyChan)
f.handleFile(t.Context(), requiredFile, copyChan)
timeout := time.After(10 * time.Second)
pulls := make([]pullBlockState, 4)
@@ -302,7 +296,7 @@ func TestCopierFinder(t *testing.T) {
}
// Verify that the fetched blocks have actually been written to the temp file
blks, err := scanner.HashFile(context.TODO(), f.ID, f.Filesystem(), tempFile, protocol.MinBlockSize, nil)
blks, err := scanner.HashFile(t.Context(), f.ID, f.Filesystem(), tempFile, protocol.MinBlockSize, nil)
if err != nil {
t.Log(err)
}
@@ -319,8 +313,7 @@ func TestCopierCleanup(t *testing.T) {
// Create a file
file := setupFile("test", []int{0})
file.Size = 1
m, f, wcfgCancel := setupSendReceiveFolder(t, file)
defer wcfgCancel()
m, f := setupSendReceiveFolder(t, file)
file.Blocks = []protocol.BlockInfo{blocks[1]}
file.Version = file.Version.Update(myID.Short())
@@ -352,8 +345,7 @@ func TestCopierCleanup(t *testing.T) {
func TestDeregisterOnFailInCopy(t *testing.T) {
file := setupFile("filex", []int{0, 2, 0, 0, 5, 0, 0, 8})
m, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
m, f := setupSendReceiveFolder(t)
// Set up our evet subscription early
s := m.evLogger.Subscribe(events.ItemFinished)
@@ -371,8 +363,8 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
finisherChan := make(chan *sharedPullerState)
dbUpdateChan := make(chan dbUpdateJob, 1)
copyChan, copyWg := startCopier(f, pullChan, finisherBufferChan)
go f.finisherRoutine(finisherChan, dbUpdateChan, make(chan string))
copyChan, copyWg := startCopier(t.Context(), f, pullChan, finisherBufferChan)
go f.finisherRoutine(t.Context(), finisherChan, dbUpdateChan, make(chan string))
defer func() {
close(copyChan)
@@ -382,7 +374,7 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
close(finisherChan)
}()
f.handleFile(file, copyChan)
f.handleFile(t.Context(), file, copyChan)
// Receive a block at puller, to indicate that at least a single copier
// loop has been performed.
@@ -451,8 +443,7 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
func TestDeregisterOnFailInPull(t *testing.T) {
file := setupFile("filex", []int{0, 2, 0, 0, 5, 0, 0, 8})
m, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
m, f := setupSendReceiveFolder(t)
// Set up our evet subscription early
s := m.evLogger.Subscribe(events.ItemFinished)
@@ -470,14 +461,14 @@ func TestDeregisterOnFailInPull(t *testing.T) {
finisherChan := make(chan *sharedPullerState)
dbUpdateChan := make(chan dbUpdateJob, 1)
copyChan, copyWg := startCopier(f, pullChan, finisherBufferChan)
copyChan, copyWg := startCopier(t.Context(), f, pullChan, finisherBufferChan)
var pullWg sync.WaitGroup
pullWg.Add(1)
go func() {
f.pullerRoutine(pullChan, finisherBufferChan)
f.pullerRoutine(t.Context(), pullChan, finisherBufferChan)
pullWg.Done()
}()
go f.finisherRoutine(finisherChan, dbUpdateChan, make(chan string))
go f.finisherRoutine(t.Context(), finisherChan, dbUpdateChan, make(chan string))
defer func() {
// Unblock copier and puller
go func() {
@@ -492,7 +483,7 @@ func TestDeregisterOnFailInPull(t *testing.T) {
close(finisherChan)
}()
f.handleFile(file, copyChan)
f.handleFile(t.Context(), file, copyChan)
// Receive at finisher, we should error out as puller has nowhere to pull
// from.
@@ -553,8 +544,7 @@ func TestDeregisterOnFailInPull(t *testing.T) {
}
func TestIssue3164(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
ffs := f.Filesystem()
ignDir := filepath.Join("issue3164", "oktodelete")
@@ -566,7 +556,7 @@ func TestIssue3164(t *testing.T) {
Name: "issue3164",
}
must(t, f.scanSubdirs(nil))
must(t, f.scanSubdirs(t.Context(), nil))
matcher := ignore.New(ffs)
must(t, matcher.Parse(bytes.NewBufferString("(?d)oktodelete"), ""))
@@ -583,8 +573,8 @@ func TestIssue3164(t *testing.T) {
func TestDiff(t *testing.T) {
for i, test := range diffTestData {
a, _ := scanner.Blocks(context.TODO(), bytes.NewBufferString(test.a), test.s, -1, nil)
b, _ := scanner.Blocks(context.TODO(), bytes.NewBufferString(test.b), test.s, -1, nil)
a, _ := scanner.Blocks(t.Context(), bytes.NewBufferString(test.a), test.s, -1, nil)
b, _ := scanner.Blocks(t.Context(), bytes.NewBufferString(test.b), test.s, -1, nil)
_, d := blockDiff(a, b)
if len(d) != len(test.d) {
t.Fatalf("Incorrect length for diff %d; %d != %d", i, len(d), len(test.d))
@@ -604,9 +594,9 @@ func TestDiff(t *testing.T) {
func BenchmarkDiff(b *testing.B) {
testCases := make([]struct{ a, b []protocol.BlockInfo }, 0, len(diffTestData))
for _, test := range diffTestData {
a, _ := scanner.Blocks(context.TODO(), bytes.NewBufferString(test.a), test.s, -1, nil)
b, _ := scanner.Blocks(context.TODO(), bytes.NewBufferString(test.b), test.s, -1, nil)
testCases = append(testCases, struct{ a, b []protocol.BlockInfo }{a, b})
aBlocks, _ := scanner.Blocks(b.Context(), bytes.NewBufferString(test.a), test.s, -1, nil)
bBlocks, _ := scanner.Blocks(b.Context(), bytes.NewBufferString(test.b), test.s, -1, nil)
testCases = append(testCases, struct{ a, b []protocol.BlockInfo }{aBlocks, bBlocks})
}
b.ReportAllocs()
b.ResetTimer()
@@ -643,8 +633,7 @@ func TestDiffEmpty(t *testing.T) {
// option is true and the permissions do not match between the file on disk and
// in the db.
func TestDeleteIgnorePerms(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
ffs := f.Filesystem()
f.IgnorePerms = true
@@ -682,7 +671,7 @@ func TestCopyOwner(t *testing.T) {
)
// This test hung on a regression, taking a long time to fail - speed that up.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second)
defer cancel()
go func() {
<-ctx.Done()
@@ -695,8 +684,7 @@ func TestCopyOwner(t *testing.T) {
// Set up a folder with the CopyParentOwner bit and backed by a fake
// filesystem.
m, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
m, f := setupSendReceiveFolder(t)
f.folder.FolderConfiguration = newFolderConfiguration(m.cfg, f.ID, f.Label, config.FilesystemTypeFake, "/TestCopyOwner")
f.folder.FolderConfiguration.CopyOwnershipFromParent = true
@@ -748,15 +736,15 @@ func TestCopyOwner(t *testing.T) {
// comes the finisher is done.
finisherChan := make(chan *sharedPullerState)
copierChan, copyWg := startCopier(f, nil, finisherChan)
go f.finisherRoutine(finisherChan, dbUpdateChan, nil)
copierChan, copyWg := startCopier(t.Context(), f, nil, finisherChan)
go f.finisherRoutine(t.Context(), finisherChan, dbUpdateChan, nil)
defer func() {
close(copierChan)
copyWg.Wait()
close(finisherChan)
}()
f.handleFile(file, copierChan)
f.handleFile(t.Context(), file, copierChan)
<-dbUpdateChan
info, err = f.mtimefs.Lstat("foo/bar/baz")
@@ -794,8 +782,7 @@ func TestCopyOwner(t *testing.T) {
// TestSRConflictReplaceFileByDir checks that a conflict is created when an existing file
// is replaced with a directory and versions are conflicting
func TestSRConflictReplaceFileByDir(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
ffs := f.Filesystem()
name := "foo"
@@ -826,8 +813,7 @@ func TestSRConflictReplaceFileByDir(t *testing.T) {
// TestSRConflictReplaceFileByLink checks that a conflict is created when an existing file
// is replaced with a link and versions are conflicting
func TestSRConflictReplaceFileByLink(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
ffs := f.Filesystem()
name := "foo"
@@ -859,8 +845,7 @@ func TestSRConflictReplaceFileByLink(t *testing.T) {
// TestDeleteBehindSymlink checks that we don't delete or schedule a scan
// when trying to delete a file behind a symlink.
func TestDeleteBehindSymlink(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
ffs := f.Filesystem()
link := "link"
@@ -898,16 +883,14 @@ func TestDeleteBehindSymlink(t *testing.T) {
// Reproduces https://github.com/syncthing/syncthing/issues/6559
func TestPullCtxCancel(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
pullChan := make(chan pullBlockState)
finisherChan := make(chan *sharedPullerState)
var cancel context.CancelFunc
f.ctx, cancel = context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(t.Context())
go f.pullerRoutine(pullChan, finisherChan)
go f.pullerRoutine(ctx, pullChan, finisherChan)
defer close(pullChan)
emptyState := func() pullBlockState {
@@ -940,8 +923,7 @@ func TestPullCtxCancel(t *testing.T) {
}
func TestPullDeleteUnscannedDir(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
ffs := f.Filesystem()
dir := "foobar"
@@ -969,14 +951,13 @@ func TestPullDeleteUnscannedDir(t *testing.T) {
}
func TestPullCaseOnlyPerformFinish(t *testing.T) {
m, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
m, f := setupSendReceiveFolder(t)
ffs := f.Filesystem()
name := "foo"
contents := []byte("contents")
writeFile(t, ffs, name, contents)
must(t, f.scanSubdirs(nil))
must(t, f.scanSubdirs(t.Context(), nil))
var cur protocol.FileInfo
hasCur := false
@@ -1032,8 +1013,7 @@ func TestPullCaseOnlySymlink(t *testing.T) {
}
func testPullCaseOnlyDirOrSymlink(t *testing.T, dir bool) {
m, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
m, f := setupSendReceiveFolder(t)
ffs := f.Filesystem()
name := "foo"
@@ -1043,7 +1023,7 @@ func testPullCaseOnlyDirOrSymlink(t *testing.T, dir bool) {
must(t, ffs.CreateSymlink("target", name))
}
must(t, f.scanSubdirs(nil))
must(t, f.scanSubdirs(t.Context(), nil))
var cur protocol.FileInfo
hasCur := false
it, errFn := m.LocalFiles(f.ID, protocol.LocalDeviceID)
@@ -1089,8 +1069,7 @@ func testPullCaseOnlyDirOrSymlink(t *testing.T, dir bool) {
}
func TestPullTempFileCaseConflict(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
copyChan := make(chan copyBlocksState, 1)
@@ -1106,7 +1085,7 @@ func TestPullTempFileCaseConflict(t *testing.T) {
fd.Close()
}
f.handleFile(file, copyChan)
f.handleFile(t.Context(), file, copyChan)
cs := <-copyChan
if _, err := cs.tempFile(); err != nil {
@@ -1117,8 +1096,7 @@ func TestPullTempFileCaseConflict(t *testing.T) {
}
func TestPullCaseOnlyRename(t *testing.T) {
m, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
m, f := setupSendReceiveFolder(t)
// tempNameConfl := fs.TempName(confl)
@@ -1132,7 +1110,7 @@ func TestPullCaseOnlyRename(t *testing.T) {
fd.Close()
}
must(t, f.scanSubdirs(nil))
must(t, f.scanSubdirs(t.Context(), nil))
cur, ok := m.testCurrentFolderFile(f.ID, name)
if !ok {
@@ -1158,8 +1136,7 @@ func TestPullSymlinkOverExistingWindows(t *testing.T) {
t.Skip()
}
m, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
m, f := setupSendReceiveFolder(t)
conn := addFakeConn(m, device1, f.ID)
name := "foo"
@@ -1172,7 +1149,7 @@ func TestPullSymlinkOverExistingWindows(t *testing.T) {
fd.Close()
}
must(t, f.scanSubdirs(nil))
must(t, f.scanSubdirs(t.Context(), nil))
file, ok := m.testCurrentFolderFile(f.ID, name)
if !ok {
@@ -1182,7 +1159,7 @@ func TestPullSymlinkOverExistingWindows(t *testing.T) {
scanChan := make(chan string)
changed, err := f.pullerIteration(scanChan)
changed, err := f.pullerIteration(t.Context(), scanChan)
must(t, err)
if changed != 1 {
t.Error("Expected one change in pull, got", changed)
@@ -1200,8 +1177,7 @@ func TestPullSymlinkOverExistingWindows(t *testing.T) {
}
func TestPullDeleteCaseConflict(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
name := "foo"
fi := protocol.FileInfo{Name: "Foo"}
@@ -1232,8 +1208,7 @@ func TestPullDeleteCaseConflict(t *testing.T) {
}
func TestPullDeleteIgnoreChildDir(t *testing.T) {
_, f, wcfgCancel := setupSendReceiveFolder(t)
defer wcfgCancel()
_, f := setupSendReceiveFolder(t)
parent := "parent"
del := "ignored"
@@ -1268,12 +1243,12 @@ func cleanupSharedPullerState(s *sharedPullerState) {
s.writer.mut.Unlock()
}
func startCopier(f *sendReceiveFolder, pullChan chan<- pullBlockState, finisherChan chan<- *sharedPullerState) (chan copyBlocksState, *sync.WaitGroup) {
func startCopier(ctx context.Context, f *sendReceiveFolder, pullChan chan<- pullBlockState, finisherChan chan<- *sharedPullerState) (chan copyBlocksState, *sync.WaitGroup) {
copyChan := make(chan copyBlocksState)
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
f.copierRoutine(copyChan, pullChan, finisherChan)
f.copierRoutine(ctx, copyChan, pullChan, finisherChan)
wg.Done()
}()
return copyChan, wg

View File

@@ -77,9 +77,8 @@ func addFolderDevicesToClusterConfig(cc *protocol.ClusterConfig, remote protocol
}
func TestRequest(t *testing.T) {
wrapper, fcfg, cancel := newDefaultCfgWrapper()
wrapper, fcfg := newDefaultCfgWrapper(t)
ffs := fcfg.Filesystem()
defer cancel()
m := setupModel(t, wrapper)
defer cleanupModel(m)
@@ -164,8 +163,7 @@ func BenchmarkIndex_100(b *testing.B) {
}
func benchmarkIndex(b *testing.B, nfiles int) {
m, _, fcfg, wcfgCancel := setupModelWithConnection(b)
defer wcfgCancel()
m, _, fcfg := setupModelWithConnection(b)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
files := genFiles(nfiles)
@@ -191,8 +189,7 @@ func BenchmarkIndexUpdate_10000_1(b *testing.B) {
}
func benchmarkIndexUpdate(b *testing.B, nfiles, nufiles int) {
m, _, fcfg, wcfgCancel := setupModelWithConnection(b)
defer wcfgCancel()
m, _, fcfg := setupModelWithConnection(b)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
files := genFiles(nfiles)
@@ -223,7 +220,7 @@ func BenchmarkRequestOut(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
data, err := m.RequestGlobal(context.Background(), device1, "default", files[i%n].Name, 0, 0, 32, nil, false)
data, err := m.RequestGlobal(b.Context(), device1, "default", files[i%n].Name, 0, 0, 32, nil, false)
if err != nil {
b.Error(err)
}
@@ -1777,8 +1774,7 @@ func TestRWScanRecovery(t *testing.T) {
}
func TestGlobalDirectoryTree(t *testing.T) {
m, conn, fcfg, wCancel := setupModelWithConnection(t)
defer wCancel()
m, conn, fcfg := setupModelWithConnection(t)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
var seq int64
@@ -2084,8 +2080,7 @@ func BenchmarkTree_100_10(b *testing.B) {
}
func benchmarkTree(b *testing.B, n1, n2 int) {
m, _, fcfg, wcfgCancel := setupModelWithConnection(b)
defer wcfgCancel()
m, _, fcfg := setupModelWithConnection(b)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
m.ScanFolder(fcfg.ID)
@@ -2342,8 +2337,7 @@ func TestIssue3829(t *testing.T) {
// TestIssue4573 tests that contents of an unavailable dir aren't marked deleted
func TestIssue4573(t *testing.T) {
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
testFs := fcfg.Filesystem()
defer os.RemoveAll(testFs.URI())
@@ -2372,8 +2366,7 @@ func TestIssue4573(t *testing.T) {
// TestInternalScan checks whether various fs operations are correctly represented
// in the db after scanning.
func TestInternalScan(t *testing.T) {
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
testFs := fcfg.Filesystem()
defer os.RemoveAll(testFs.URI())
@@ -2472,8 +2465,7 @@ func TestCustomMarkerName(t *testing.T) {
}
func TestRemoveDirWithContent(t *testing.T) {
m, conn, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, conn, fcfg := setupModelWithConnection(t)
tfs := fcfg.Filesystem()
defer cleanupModelAndRemoveDir(m, tfs.URI())
@@ -2533,8 +2525,7 @@ func TestRemoveDirWithContent(t *testing.T) {
}
func TestIssue4475(t *testing.T) {
m, conn, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, conn, fcfg := setupModelWithConnection(t)
defer cleanupModel(m)
testFs := fcfg.Filesystem()
@@ -2870,8 +2861,7 @@ func TestIssue4903(t *testing.T) {
func TestIssue5002(t *testing.T) {
// recheckFile should not panic when given an index equal to the number of blocks
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
ffs := fcfg.Filesystem()
fd, err := ffs.Create("foo")
@@ -2899,8 +2889,7 @@ func TestIssue5002(t *testing.T) {
}
func TestParentOfUnignored(t *testing.T) {
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
ffs := fcfg.Filesystem()
must(t, ffs.Mkdir("bar", 0o755))
@@ -2979,15 +2968,13 @@ func TestFolderRestartZombies(t *testing.T) {
}
func TestRequestLimit(t *testing.T) {
wrapper, fcfg, cancel := newDefaultCfgWrapper()
wrapper, fcfg := newDefaultCfgWrapper(t)
ffs := fcfg.Filesystem()
file := "tmpfile"
fd, err := ffs.Create(file)
must(t, err)
fd.Close()
defer cancel()
waiter, err := wrapper.Modify(func(cfg *config.Configuration) {
_, i, _ := cfg.Device(device1)
cfg.Devices[i].MaxRequestKiB = 1
@@ -3037,8 +3024,7 @@ func TestConnCloseOnRestart(t *testing.T) {
protocol.CloseTimeout = oldCloseTimeout
}()
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
m := setupModel(t, w)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
@@ -3079,8 +3065,7 @@ func TestConnCloseOnRestart(t *testing.T) {
}
func TestModTimeWindow(t *testing.T) {
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
tfs := modtimeTruncatingFS{
trunc: 0,
Filesystem: fcfg.Filesystem(),
@@ -3141,8 +3126,7 @@ func TestModTimeWindow(t *testing.T) {
}
func TestDevicePause(t *testing.T) {
m, _, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, _, fcfg := setupModelWithConnection(t)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
sub := m.evLogger.Subscribe(events.DevicePaused)
@@ -3171,8 +3155,7 @@ func TestDevicePause(t *testing.T) {
}
func TestDeviceWasSeen(t *testing.T) {
m, _, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, _, fcfg := setupModelWithConnection(t)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
m.deviceWasSeen(device1)
@@ -3216,8 +3199,7 @@ func TestNewLimitedRequestResponse(t *testing.T) {
}
func TestSummaryPausedNoError(t *testing.T) {
wcfg, fcfg, wcfgCancel := newDefaultCfgWrapper()
defer wcfgCancel()
wcfg, fcfg := newDefaultCfgWrapper(t)
pauseFolder(t, wcfg, fcfg.ID, true)
m := setupModel(t, wcfg)
defer cleanupModel(m)
@@ -3229,14 +3211,15 @@ func TestSummaryPausedNoError(t *testing.T) {
}
func TestFolderAPIErrors(t *testing.T) {
wcfg, fcfg, wcfgCancel := newDefaultCfgWrapper()
defer wcfgCancel()
wcfg, fcfg := newDefaultCfgWrapper(t)
pauseFolder(t, wcfg, fcfg.ID, true)
m := setupModel(t, wcfg)
defer cleanupModel(m)
methods := []func(folder string) error{
m.ScanFolder,
func(folder string) error {
return m.ScanFolder(folder)
},
func(folder string) error {
return m.ScanFolderSubdirs(folder, nil)
},
@@ -3261,8 +3244,7 @@ func TestFolderAPIErrors(t *testing.T) {
}
func TestRenameSequenceOrder(t *testing.T) {
wcfg, fcfg, wcfgCancel := newDefaultCfgWrapper()
defer wcfgCancel()
wcfg, fcfg := newDefaultCfgWrapper(t)
m := setupModel(t, wcfg)
defer cleanupModel(m)
@@ -3324,8 +3306,7 @@ func TestRenameSequenceOrder(t *testing.T) {
}
func TestRenameSameFile(t *testing.T) {
wcfg, fcfg, wcfgCancel := newDefaultCfgWrapper()
defer wcfgCancel()
wcfg, fcfg := newDefaultCfgWrapper(t)
m := setupModel(t, wcfg)
defer cleanupModel(m)
@@ -3368,8 +3349,7 @@ func TestRenameSameFile(t *testing.T) {
}
func TestBlockListMap(t *testing.T) {
wcfg, fcfg, wcfgCancel := newDefaultCfgWrapper()
defer wcfgCancel()
wcfg, fcfg := newDefaultCfgWrapper(t)
m := setupModel(t, wcfg)
defer cleanupModel(m)
@@ -3437,8 +3417,7 @@ func TestBlockListMap(t *testing.T) {
}
func TestScanRenameCaseOnly(t *testing.T) {
wcfg, fcfg, wcfgCancel := newDefaultCfgWrapper()
defer wcfgCancel()
wcfg, fcfg := newDefaultCfgWrapper(t)
m := setupModel(t, wcfg)
defer cleanupModel(m)
@@ -3558,9 +3537,8 @@ func TestAddFolderCompletion(t *testing.T) {
}
func TestScanDeletedROChangedOnSR(t *testing.T) {
m, conn, fcfg, wCancel := setupModelWithConnection(t)
m, conn, fcfg := setupModelWithConnection(t)
ffs := fcfg.Filesystem()
defer wCancel()
defer cleanupModelAndRemoveDir(m, ffs.URI())
fcfg.Type = config.FolderTypeReceiveOnly
setFolder(t, m.cfg, fcfg)
@@ -3600,8 +3578,7 @@ func TestScanDeletedROChangedOnSR(t *testing.T) {
func testConfigChangeTriggersClusterConfigs(t *testing.T, expectFirst, expectSecond bool, pre func(config.Wrapper), fn func(config.Wrapper)) {
t.Helper()
wcfg, _, wcfgCancel := newDefaultCfgWrapper()
defer wcfgCancel()
wcfg, _ := newDefaultCfgWrapper(t)
m := setupModel(t, wcfg)
defer cleanupModel(m)
@@ -3663,8 +3640,7 @@ func testConfigChangeTriggersClusterConfigs(t *testing.T, expectFirst, expectSec
// That then causes these files to be considered as needed, while they are not.
// https://github.com/syncthing/syncthing/issues/6961
func TestIssue6961(t *testing.T) {
wcfg, fcfg, wcfgCancel := newDefaultCfgWrapper()
defer wcfgCancel()
wcfg, fcfg := newDefaultCfgWrapper(t)
tfs := fcfg.Filesystem()
waiter, err := wcfg.Modify(func(cfg *config.Configuration) {
cfg.SetDevice(newDeviceConfiguration(cfg.Defaults.Device, device2, "device2"))
@@ -3728,8 +3704,7 @@ func TestIssue6961(t *testing.T) {
}
func TestCompletionEmptyGlobal(t *testing.T) {
m, conn, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, conn, fcfg := setupModelWithConnection(t)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
files := []protocol.FileInfo{{Name: "foo", Version: protocol.Vector{}.Update(myID.Short()), Sequence: 1}}
m.sdb.Update(fcfg.ID, protocol.LocalDeviceID, files)
@@ -3743,8 +3718,7 @@ func TestCompletionEmptyGlobal(t *testing.T) {
}
func TestNeedMetaAfterIndexReset(t *testing.T) {
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
addDevice2(t, w, fcfg)
m := setupModel(t, w)
defer cleanupModelAndRemoveDir(m, fcfg.Path)
@@ -3786,8 +3760,7 @@ func TestCcCheckEncryption(t *testing.T) {
t.Skip("skipping on short testing - generating encryption tokens is slow")
}
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
m := setupModel(t, w)
m.cancel()
defer cleanupModel(m)
@@ -3927,8 +3900,7 @@ func TestCcCheckEncryption(t *testing.T) {
func TestCCFolderNotRunning(t *testing.T) {
// Create the folder, but don't start it.
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
tfs := fcfg.Filesystem()
m := newModel(t, w, myID, nil)
defer cleanupModelAndRemoveDir(m, tfs.URI())
@@ -3955,8 +3927,7 @@ func TestCCFolderNotRunning(t *testing.T) {
}
func TestPendingFolder(t *testing.T) {
w, _, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, _ := newDefaultCfgWrapper(t)
m := setupModel(t, w)
defer cleanupModel(m)
@@ -4035,11 +4006,10 @@ func TestDeletedNotLocallyChangedReceiveEncrypted(t *testing.T) {
}
func deletedNotLocallyChanged(t *testing.T, ft config.FolderType) {
w, fcfg, wCancel := newDefaultCfgWrapper()
w, fcfg := newDefaultCfgWrapper(t)
tfs := fcfg.Filesystem()
fcfg.Type = ft
setFolder(t, w, fcfg)
defer wCancel()
m := setupModel(t, w)
defer cleanupModelAndRemoveDir(m, tfs.URI())

View File

@@ -30,8 +30,7 @@ func TestRequestSimple(t *testing.T) {
// Verify that the model performs a request and creates a file based on
// an incoming index update.
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
tfs := fcfg.Filesystem()
defer cleanupModelAndRemoveDir(m, tfs.URI())
@@ -78,8 +77,7 @@ func TestSymlinkTraversalRead(t *testing.T) {
return
}
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
// We listen for incoming index updates and trigger when we see one for
@@ -121,8 +119,7 @@ func TestSymlinkTraversalWrite(t *testing.T) {
return
}
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
// We listen for incoming index updates and trigger when we see one for
@@ -180,8 +177,7 @@ func TestSymlinkTraversalWrite(t *testing.T) {
func TestRequestCreateTmpSymlink(t *testing.T) {
// Test that an update for a temporary file is invalidated
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
// We listen for incoming index updates and trigger when we see one for
@@ -356,8 +352,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
}
func TestIssue4841(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
received := make(chan []protocol.FileInfo)
@@ -405,8 +400,7 @@ func TestIssue4841(t *testing.T) {
}
func TestRescanIfHaveInvalidContent(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
tfs := fcfg.Filesystem()
defer cleanupModelAndRemoveDir(m, tfs.URI())
@@ -467,8 +461,7 @@ func TestRescanIfHaveInvalidContent(t *testing.T) {
func TestParentDeletion(t *testing.T) {
t.Skip("flaky")
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
testFs := fcfg.Filesystem()
defer cleanupModelAndRemoveDir(m, testFs.URI())
@@ -546,8 +539,7 @@ func TestRequestSymlinkWindows(t *testing.T) {
t.Skip("windows specific test")
}
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
received := make(chan []protocol.FileInfo)
@@ -623,8 +615,7 @@ func equalContents(fs fs.Filesystem, path string, contents []byte) error {
func TestRequestRemoteRenameChanged(t *testing.T) {
t.Skip("flaky")
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
tfs := fcfg.Filesystem()
defer cleanupModel(m)
@@ -756,8 +747,7 @@ func TestRequestRemoteRenameChanged(t *testing.T) {
}
func TestRequestRemoteRenameConflict(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
tfs := fcfg.Filesystem()
defer cleanupModel(m)
@@ -846,8 +836,7 @@ func TestRequestRemoteRenameConflict(t *testing.T) {
}
func TestRequestDeleteChanged(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
tfs := fcfg.Filesystem()
defer cleanupModelAndRemoveDir(m, tfs.URI())
@@ -925,8 +914,7 @@ func TestRequestDeleteChanged(t *testing.T) {
}
func TestNeedFolderFiles(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
defer cleanupModel(m)
sub := m.evLogger.Subscribe(events.RemoteIndexUpdated)
@@ -970,8 +958,7 @@ func TestNeedFolderFiles(t *testing.T) {
// propagated upon un-ignoring.
// https://github.com/syncthing/syncthing/issues/6038
func TestIgnoreDeleteUnignore(t *testing.T) {
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
m := setupModel(t, w)
fss := fcfg.Filesystem()
defer cleanupModel(m)
@@ -1065,8 +1052,7 @@ func TestIgnoreDeleteUnignore(t *testing.T) {
// TestRequestLastFileProgress checks that the last pulled file (here only) is registered
// as in progress.
func TestRequestLastFileProgress(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
tfs := fcfg.Filesystem()
defer cleanupModelAndRemoveDir(m, tfs.URI())
@@ -1100,8 +1086,7 @@ func TestRequestIndexSenderPause(t *testing.T) {
done := make(chan struct{})
defer close(done)
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
tfs := fcfg.Filesystem()
defer cleanupModelAndRemoveDir(m, tfs.URI())
@@ -1213,8 +1198,7 @@ func TestRequestIndexSenderPause(t *testing.T) {
}
func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
tfs := fcfg.Filesystem()
dir1 := "foo"
dir2 := "bar"
@@ -1280,8 +1264,7 @@ func TestRequestReceiveEncrypted(t *testing.T) {
t.Skip("skipping on short testing - scrypt is too slow")
}
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
w, fcfg := newDefaultCfgWrapper(t)
tfs := fcfg.Filesystem()
fcfg.Type = config.FolderTypeReceiveEncrypted
setFolder(t, w, fcfg)
@@ -1375,8 +1358,7 @@ func TestRequestGlobalInvalidToValid(t *testing.T) {
done := make(chan struct{})
defer close(done)
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
m, fc, fcfg := setupModelWithConnection(t)
fcfg.Devices = append(fcfg.Devices, config.FolderDeviceConfiguration{DeviceID: device2})
waiter, err := m.cfg.Modify(func(cfg *config.Configuration) {
cfg.SetDevice(newDeviceConfiguration(cfg.Defaults.Device, device2, "device2"))

View File

@@ -85,19 +85,24 @@ func init() {
}
func newConfigWrapper(cfg config.Configuration) (config.Wrapper, context.CancelFunc) {
wrapper := config.Wrap("", cfg, myID, events.NoopLogger)
ctx, cancel := context.WithCancel(context.Background())
go wrapper.Serve(ctx)
return wrapper, cancel
return newConfigWrapperFromContext(context.Background(), cfg)
}
func newDefaultCfgWrapper() (config.Wrapper, config.FolderConfiguration, context.CancelFunc) {
w, cancel := newConfigWrapper(defaultCfgWrapper.RawCopy())
func newDefaultCfgWrapper(t testing.TB) (config.Wrapper, config.FolderConfiguration) {
w, cancel := newConfigWrapperFromContext(t.Context(), defaultCfgWrapper.RawCopy())
t.Cleanup(cancel)
fcfg := newFolderConfig()
_, _ = w.Modify(func(cfg *config.Configuration) {
cfg.SetFolder(fcfg)
})
return w, fcfg, cancel
return w, fcfg
}
func newConfigWrapperFromContext(ctx context.Context, cfg config.Configuration) (config.Wrapper, context.CancelFunc) {
wrapper := config.Wrap("", cfg, myID, events.NoopLogger)
ctx, cancel := context.WithCancel(ctx)
go wrapper.Serve(ctx)
return wrapper, cancel
}
func newFolderConfig() config.FolderConfiguration {
@@ -108,11 +113,11 @@ func newFolderConfig() config.FolderConfiguration {
return cfg
}
func setupModelWithConnection(t testing.TB) (*testModel, *fakeConnection, config.FolderConfiguration, context.CancelFunc) {
func setupModelWithConnection(t testing.TB) (*testModel, *fakeConnection, config.FolderConfiguration) {
t.Helper()
w, fcfg, cancel := newDefaultCfgWrapper()
w, fcfg := newDefaultCfgWrapper(t)
m, fc := setupModelWithConnectionFromWrapper(t, w)
return m, fc, fcfg, cancel
return m, fc, fcfg
}
func setupModelWithConnectionFromWrapper(t testing.TB, w config.Wrapper) (*testModel, *fakeConnection) {
@@ -157,7 +162,7 @@ func newModel(t testing.TB, cfg config.Wrapper, id protocol.DeviceID, protectedF
mdb.Close()
})
m := NewModel(cfg, id, mdb, protectedFiles, evLogger, protocol.NewKeyGenerator()).(*model)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(t.Context())
go evLogger.Serve(ctx)
return &testModel{
model: m,