diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 86847c6ae..c65994041 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -634,12 +634,12 @@ func syncthingMain(runtimeOptions RuntimeOptions) { os.Exit(1) } - // Check if auto-upgrades should be done and if yes, do an initial + // Check if auto-upgrades is possible, and if yes, and it's enabled do an initial // upgrade immedately. The auto-upgrade routine can only be started // later after App is initialised. - shouldAutoUpgrade := shouldUpgrade(cfg, runtimeOptions) - if shouldAutoUpgrade { + autoUpgradePossible := autoUpgradePossible(runtimeOptions) + if autoUpgradePossible && cfg.Options().AutoUpgradeEnabled() { // try to do upgrade directly and log the error if relevant. release, err := initialAutoUpgradeCheck(db.NewMiscDataNamespace(ldb)) if err == nil { @@ -680,7 +680,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) { app := syncthing.New(cfg, ldb, evLogger, cert, appOpts) - if shouldAutoUpgrade { + if autoUpgradePossible { go autoUpgrade(cfg, app, evLogger) } @@ -702,9 +702,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) { } } - if opts := cfg.Options(); opts.RestartOnWakeup { - go standbyMonitor(app) - } + go standbyMonitor(app, cfg) if err := app.Start(); err != nil { os.Exit(syncthing.ExitError.AsInt()) @@ -818,12 +816,12 @@ func ensureDir(dir string, mode fs.FileMode) error { return nil } -func standbyMonitor(app *syncthing.App) { +func standbyMonitor(app *syncthing.App, cfg config.Wrapper) { restartDelay := 60 * time.Second now := time.Now() for { time.Sleep(10 * time.Second) - if time.Since(now) > 2*time.Minute { + if time.Since(now) > 2*time.Minute && cfg.Options().RestartOnWakeup { l.Infof("Paused state detected, possibly woke up from standby. Restarting in %v.", restartDelay) // We most likely just woke from standby. If we restart @@ -838,13 +836,10 @@ func standbyMonitor(app *syncthing.App) { } } -func shouldUpgrade(cfg config.Wrapper, runtimeOptions RuntimeOptions) bool { +func autoUpgradePossible(runtimeOptions RuntimeOptions) bool { if upgrade.DisabledByCompilation { return false } - if !cfg.Options().ShouldAutoUpgrade() { - return false - } if runtimeOptions.NoUpgrade { l.Infof("No automatic upgrades; STNOUPGRADE environment variable defined.") return false @@ -862,18 +857,19 @@ func autoUpgrade(cfg config.Wrapper, app *syncthing.App, evLogger events.Logger) if !ok || data["clientName"] != "syncthing" || upgrade.CompareVersions(data["clientVersion"], build.Version) != upgrade.Newer { continue } - l.Infof("Connected to device %s with a newer version (current %q < remote %q). Checking for upgrades.", data["id"], build.Version, data["clientVersion"]) + if cfg.Options().AutoUpgradeEnabled() { + l.Infof("Connected to device %s with a newer version (current %q < remote %q). Checking for upgrades.", data["id"], build.Version, data["clientVersion"]) + } case <-timer.C: } opts := cfg.Options() - checkInterval := time.Duration(opts.AutoUpgradeIntervalH) * time.Hour - if checkInterval < time.Hour { - // We shouldn't be here if AutoUpgradeIntervalH < 1, but for - // safety's sake. - checkInterval = time.Hour + if !opts.AutoUpgradeEnabled() { + timer.Reset(upgradeCheckInterval) + continue } + checkInterval := time.Duration(opts.AutoUpgradeIntervalH) * time.Hour rel, err := upgrade.LatestRelease(opts.ReleasesURL, build.Version, opts.UpgradeToPreReleases) if err == upgrade.ErrUpgradeUnsupported { sub.Unsubscribe() diff --git a/lib/api/api.go b/lib/api/api.go index e5ec85be6..15a76bf80 100644 --- a/lib/api/api.go +++ b/lib/api/api.go @@ -77,7 +77,7 @@ type service struct { eventSubs map[events.EventType]events.BufferedSubscription eventSubsMut sync.Mutex evLogger events.Logger - discoverer discover.CachingMux + discoverer discover.Manager connectionsService connections.Service fss model.FolderSummaryService urService *ur.Service @@ -107,7 +107,7 @@ type Service interface { WaitForStart() error } -func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, evLogger events.Logger, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, contr Controller, noUpgrade bool) Service { +func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, evLogger events.Logger, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, contr Controller, noUpgrade bool) Service { s := &service{ id: id, cfg: cfg, diff --git a/lib/api/mocked_discovery_test.go b/lib/api/mocked_discovery_test.go index eb0c11a20..e560a58aa 100644 --- a/lib/api/mocked_discovery_test.go +++ b/lib/api/mocked_discovery_test.go @@ -8,7 +8,6 @@ package api import ( "context" - "time" "github.com/syncthing/syncthing/lib/discover" "github.com/syncthing/syncthing/lib/protocol" @@ -43,10 +42,7 @@ func (m *mockedCachingMux) Cache() map[protocol.DeviceID]discover.CacheEntry { return nil } -// from events.CachingMux - -func (m *mockedCachingMux) Add(finder discover.Finder, cacheTime, negCacheTime time.Duration) { -} +// from events.Manager func (m *mockedCachingMux) ChildErrors() map[string]error { return nil diff --git a/lib/config/migrations.go b/lib/config/migrations.go index bc126d12d..185287603 100644 --- a/lib/config/migrations.go +++ b/lib/config/migrations.go @@ -215,7 +215,7 @@ func migrateToConfigV18(cfg *Configuration) { // Do channel selection for existing users. Those who have auto upgrades // and usage reporting on default to the candidate channel. Others get // stable. - if cfg.Options.URAccepted > 0 && cfg.Options.AutoUpgradeIntervalH > 0 { + if cfg.Options.URAccepted > 0 && cfg.Options.AutoUpgradeEnabled() { cfg.Options.UpgradeToPreReleases = true } diff --git a/lib/config/optionsconfiguration.go b/lib/config/optionsconfiguration.go index 71a9956e3..d73426d29 100644 --- a/lib/config/optionsconfiguration.go +++ b/lib/config/optionsconfiguration.go @@ -17,11 +17,11 @@ import ( type OptionsConfiguration struct { RawListenAddresses []string `xml:"listenAddress" json:"listenAddresses" default:"default"` - RawGlobalAnnServers []string `xml:"globalAnnounceServer" json:"globalAnnounceServers" default:"default" restart:"true"` - GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" json:"globalAnnounceEnabled" default:"true" restart:"true"` - LocalAnnEnabled bool `xml:"localAnnounceEnabled" json:"localAnnounceEnabled" default:"true" restart:"true"` - LocalAnnPort int `xml:"localAnnouncePort" json:"localAnnouncePort" default:"21027" restart:"true"` - LocalAnnMCAddr string `xml:"localAnnounceMCAddr" json:"localAnnounceMCAddr" default:"[ff12::8384]:21027" restart:"true"` + RawGlobalAnnServers []string `xml:"globalAnnounceServer" json:"globalAnnounceServers" default:"default"` + GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" json:"globalAnnounceEnabled" default:"true"` + LocalAnnEnabled bool `xml:"localAnnounceEnabled" json:"localAnnounceEnabled" default:"true"` + LocalAnnPort int `xml:"localAnnouncePort" json:"localAnnouncePort" default:"21027"` + LocalAnnMCAddr string `xml:"localAnnounceMCAddr" json:"localAnnounceMCAddr" default:"[ff12::8384]:21027"` MaxSendKbps int `xml:"maxSendKbps" json:"maxSendKbps"` MaxRecvKbps int `xml:"maxRecvKbps" json:"maxRecvKbps"` ReconnectIntervalS int `xml:"reconnectionIntervalS" json:"reconnectionIntervalS" default:"60"` @@ -38,15 +38,15 @@ type OptionsConfiguration struct { URURL string `xml:"urURL" json:"urURL" default:"https://data.syncthing.net/newdata"` // usage reporting URL URPostInsecurely bool `xml:"urPostInsecurely" json:"urPostInsecurely" default:"false"` // For testing URInitialDelayS int `xml:"urInitialDelayS" json:"urInitialDelayS" default:"1800"` - RestartOnWakeup bool `xml:"restartOnWakeup" json:"restartOnWakeup" default:"true" restart:"true"` - AutoUpgradeIntervalH int `xml:"autoUpgradeIntervalH" json:"autoUpgradeIntervalH" default:"12" restart:"true"` // 0 for off - UpgradeToPreReleases bool `xml:"upgradeToPreReleases" json:"upgradeToPreReleases" restart:"true"` // when auto upgrades are enabled - KeepTemporariesH int `xml:"keepTemporariesH" json:"keepTemporariesH" default:"24"` // 0 for off - CacheIgnoredFiles bool `xml:"cacheIgnoredFiles" json:"cacheIgnoredFiles" default:"false" restart:"true"` + RestartOnWakeup bool `xml:"restartOnWakeup" json:"restartOnWakeup" default:"true"` + AutoUpgradeIntervalH int `xml:"autoUpgradeIntervalH" json:"autoUpgradeIntervalH" default:"12"` // 0 for off + UpgradeToPreReleases bool `xml:"upgradeToPreReleases" json:"upgradeToPreReleases"` // when auto upgrades are enabled + KeepTemporariesH int `xml:"keepTemporariesH" json:"keepTemporariesH" default:"24"` // 0 for off + CacheIgnoredFiles bool `xml:"cacheIgnoredFiles" json:"cacheIgnoredFiles" default:"false"` ProgressUpdateIntervalS int `xml:"progressUpdateIntervalS" json:"progressUpdateIntervalS" default:"5"` LimitBandwidthInLan bool `xml:"limitBandwidthInLan" json:"limitBandwidthInLan" default:"false"` MinHomeDiskFree Size `xml:"minHomeDiskFree" json:"minHomeDiskFree" default:"1 %"` - ReleasesURL string `xml:"releasesURL" json:"releasesURL" default:"https://upgrades.syncthing.net/meta.json" restart:"true"` + ReleasesURL string `xml:"releasesURL" json:"releasesURL" default:"https://upgrades.syncthing.net/meta.json"` AlwaysLocalNets []string `xml:"alwaysLocalNet" json:"alwaysLocalNets"` OverwriteRemoteDevNames bool `xml:"overwriteRemoteDeviceNamesOnConnect" json:"overwriteRemoteDeviceNamesOnConnect" default:"false"` TempIndexMinBlocks int `xml:"tempIndexMinBlocks" json:"tempIndexMinBlocks" default:"10"` @@ -56,11 +56,11 @@ type OptionsConfiguration struct { SetLowPriority bool `xml:"setLowPriority" json:"setLowPriority" default:"true"` RawMaxFolderConcurrency int `xml:"maxFolderConcurrency" json:"maxFolderConcurrency"` CRURL string `xml:"crashReportingURL" json:"crURL" default:"https://crash.syncthing.net/newcrash"` // crash reporting URL - CREnabled bool `xml:"crashReportingEnabled" json:"crashReportingEnabled" default:"true" restart:"true"` - StunKeepaliveStartS int `xml:"stunKeepaliveStartS" json:"stunKeepaliveStartS" default:"180"` // 0 for off - StunKeepaliveMinS int `xml:"stunKeepaliveMinS" json:"stunKeepaliveMinS" default:"20"` // 0 for off + CREnabled bool `xml:"crashReportingEnabled" json:"crashReportingEnabled" default:"true"` // Read in the monitor, but it's read before every attempt to report stuff, so does not require a restart. + StunKeepaliveStartS int `xml:"stunKeepaliveStartS" json:"stunKeepaliveStartS" default:"180"` // 0 for off + StunKeepaliveMinS int `xml:"stunKeepaliveMinS" json:"stunKeepaliveMinS" default:"20"` // 0 for off RawStunServers []string `xml:"stunServer" json:"stunServers" default:"default"` - DatabaseTuning Tuning `xml:"databaseTuning" json:"databaseTuning" restart:"true"` + DatabaseTuning Tuning `xml:"databaseTuning" json:"databaseTuning" restart:"true"` // Can't be adjusted once the database has been opened RawMaxCIRequestKiB int `xml:"maxConcurrentIncomingRequestKiB" json:"maxConcurrentIncomingRequestKiB"` DeprecatedUPnPEnabled bool `xml:"upnpEnabled,omitempty" json:"-"` @@ -201,6 +201,6 @@ func (opts OptionsConfiguration) MaxConcurrentIncomingRequestKiB() int { return opts.RawMaxCIRequestKiB } -func (opts OptionsConfiguration) ShouldAutoUpgrade() bool { +func (opts OptionsConfiguration) AutoUpgradeEnabled() bool { return opts.AutoUpgradeIntervalH > 0 } diff --git a/lib/discover/cache.go b/lib/discover/cache.go index fc74fe85c..b22255020 100644 --- a/lib/discover/cache.go +++ b/lib/discover/cache.go @@ -7,41 +7,21 @@ package discover import ( - "context" - "sort" stdsync "sync" "time" - "github.com/syncthing/syncthing/lib/protocol" - "github.com/syncthing/syncthing/lib/sync" - "github.com/syncthing/syncthing/lib/util" "github.com/thejerf/suture" + + "github.com/syncthing/syncthing/lib/protocol" ) -// The CachingMux aggregates results from multiple Finders. Each Finder has -// an associated cache time and negative cache time. The cache time sets how -// long we cache and return successful lookup results, the negative cache -// time sets how long we refrain from asking about the same device ID after -// receiving a negative answer. The value of zero disables caching (positive -// or negative). -type CachingMux interface { - FinderService - Add(finder Finder, cacheTime, negCacheTime time.Duration) - ChildErrors() map[string]error -} - -type cachingMux struct { - *suture.Supervisor - finders []cachedFinder - caches []*cache - mut sync.RWMutex -} - // A cachedFinder is a Finder with associated cache timeouts. type cachedFinder struct { Finder cacheTime time.Duration negCacheTime time.Duration + cache *cache + token *suture.ServiceToken } // An error may implement cachedError, in which case it will be interrogated @@ -51,150 +31,6 @@ type cachedError interface { CacheFor() time.Duration } -func NewCachingMux() CachingMux { - return &cachingMux{ - Supervisor: suture.New("discover.cachingMux", suture.Spec{ - PassThroughPanics: true, - }), - mut: sync.NewRWMutex(), - } -} - -// Add registers a new Finder, with associated cache timeouts. -func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) { - m.mut.Lock() - m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime}) - m.caches = append(m.caches, newCache()) - m.mut.Unlock() - - if service, ok := finder.(suture.Service); ok { - m.Supervisor.Add(service) - } -} - -// Lookup attempts to resolve the device ID using any of the added Finders, -// while obeying the cache settings. -func (m *cachingMux) Lookup(ctx context.Context, deviceID protocol.DeviceID) (addresses []string, err error) { - m.mut.RLock() - for i, finder := range m.finders { - if cacheEntry, ok := m.caches[i].Get(deviceID); ok { - // We have a cache entry. Lets see what it says. - - if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime { - // It's a positive, valid entry. Use it. - l.Debugln("cached discovery entry for", deviceID, "at", finder) - l.Debugln(" cache:", cacheEntry) - addresses = append(addresses, cacheEntry.Addresses...) - continue - } - - valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime - if !cacheEntry.found && valid { - // It's a negative, valid entry. We should not make another - // attempt right now. - l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil) - continue - } - - // It's expired. Ignore and continue. - } - - // Perform the actual lookup and cache the result. - if addrs, err := finder.Lookup(ctx, deviceID); err == nil { - l.Debugln("lookup for", deviceID, "at", finder) - l.Debugln(" addresses:", addrs) - addresses = append(addresses, addrs...) - m.caches[i].Set(deviceID, CacheEntry{ - Addresses: addrs, - when: time.Now(), - found: len(addrs) > 0, - }) - } else { - // Lookup returned error, add a negative cache entry. - entry := CacheEntry{ - when: time.Now(), - found: false, - } - if err, ok := err.(cachedError); ok { - entry.validUntil = time.Now().Add(err.CacheFor()) - } - m.caches[i].Set(deviceID, entry) - } - } - m.mut.RUnlock() - - addresses = util.UniqueTrimmedStrings(addresses) - sort.Strings(addresses) - - l.Debugln("lookup results for", deviceID) - l.Debugln(" addresses: ", addresses) - - return addresses, nil -} - -func (m *cachingMux) String() string { - return "discovery cache" -} - -func (m *cachingMux) Error() error { - return nil -} - -func (m *cachingMux) ChildErrors() map[string]error { - children := make(map[string]error, len(m.finders)) - m.mut.RLock() - for _, f := range m.finders { - children[f.String()] = f.Error() - } - m.mut.RUnlock() - return children -} - -func (m *cachingMux) Cache() map[protocol.DeviceID]CacheEntry { - // Res will be the "total" cache, i.e. the union of our cache and all our - // children's caches. - res := make(map[protocol.DeviceID]CacheEntry) - - m.mut.RLock() - for i := range m.finders { - // Each finder[i] has a corresponding cache at cache[i]. Go through - // it and populate the total, appending any addresses and keeping - // the newest "when" time. We skip any negative cache entries. - for k, v := range m.caches[i].Cache() { - if v.found { - cur := res[k] - if v.when.After(cur.when) { - cur.when = v.when - } - cur.Addresses = append(cur.Addresses, v.Addresses...) - res[k] = cur - } - } - - // Then ask the finder itself for its cache and do the same. If this - // finder is a global discovery client, it will have no cache. If it's - // a local discovery client, this will be its current state. - for k, v := range m.finders[i].Cache() { - if v.found { - cur := res[k] - if v.when.After(cur.when) { - cur.when = v.when - } - cur.Addresses = append(cur.Addresses, v.Addresses...) - res[k] = cur - } - } - } - m.mut.RUnlock() - - for k, v := range res { - v.Addresses = util.UniqueTrimmedStrings(v.Addresses) - res[k] = v - } - - return res -} - // A cache can be embedded wherever useful type cache struct { diff --git a/lib/discover/cache_test.go b/lib/discover/cache_test.go index e44c18446..d6dc4a081 100644 --- a/lib/discover/cache_test.go +++ b/lib/discover/cache_test.go @@ -8,10 +8,13 @@ package discover import ( "context" + "crypto/tls" "reflect" "testing" "time" + "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" ) @@ -30,15 +33,19 @@ func TestCacheUnique(t *testing.T) { "tcp://192.0.2.44:22000", } - c := NewCachingMux() - c.(*cachingMux).ServeBackground() + cfg := config.New(protocol.LocalDeviceID) + cfg.Options.LocalAnnEnabled = false + cfg.Options.GlobalAnnEnabled = false + + c := NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager) + c.ServeBackground() defer c.Stop() // Add a fake discovery service and verify we get its answers through the // cache. f1 := &fakeDiscovery{addresses0} - c.Add(f1, time.Minute, 0) + c.addLocked("f1", f1, time.Minute, 0) ctx := context.Background() @@ -54,7 +61,7 @@ func TestCacheUnique(t *testing.T) { // duplicate or otherwise mess up the responses now. f2 := &fakeDiscovery{addresses1} - c.Add(f2, time.Minute, 0) + c.addLocked("f2", f2, time.Minute, 0) addr, err = c.Lookup(ctx, protocol.LocalDeviceID) if err != nil { @@ -86,15 +93,19 @@ func (f *fakeDiscovery) Cache() map[protocol.DeviceID]CacheEntry { } func TestCacheSlowLookup(t *testing.T) { - c := NewCachingMux() - c.(*cachingMux).ServeBackground() + cfg := config.New(protocol.LocalDeviceID) + cfg.Options.LocalAnnEnabled = false + cfg.Options.GlobalAnnEnabled = false + + c := NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager) + c.ServeBackground() defer c.Stop() // Add a slow discovery service. started := make(chan struct{}) f1 := &slowDiscovery{time.Second, started} - c.Add(f1, time.Minute, 0) + c.addLocked("f1", f1, time.Minute, 0) // Start a lookup, which will take at least a second diff --git a/lib/discover/discover.go b/lib/discover/discover.go index 383049693..d866aa16a 100644 --- a/lib/discover/discover.go +++ b/lib/discover/discover.go @@ -37,11 +37,6 @@ type FinderService interface { suture.Service } -type FinderMux interface { - Finder - ChildStatus() map[string]error -} - // The AddressLister answers questions about what addresses we are listening // on. type AddressLister interface { diff --git a/lib/discover/global.go b/lib/discover/global.go index bcd96464e..9aa2e09d4 100644 --- a/lib/discover/global.go +++ b/lib/discover/global.go @@ -12,6 +12,7 @@ import ( "crypto/tls" "encoding/json" "errors" + "fmt" "io" "io/ioutil" "net/http" @@ -448,3 +449,15 @@ func (c *contextClient) Post(ctx context.Context, url, ctype string, data io.Rea req.Header.Set("Content-Type", ctype) return c.Client.Do(req) } + +func globalDiscoveryIdentity(addr string) string { + return "global discovery server " + addr +} + +func ipv4Identity(port int) string { + return fmt.Sprintf("IPv4 local broadcast discovery on port %d", port) +} + +func ipv6Identity(addr string) string { + return fmt.Sprintf("IPv6 local multicast discovery on address %s", addr) +} diff --git a/lib/discover/manager.go b/lib/discover/manager.go new file mode 100644 index 000000000..1e2b34b0f --- /dev/null +++ b/lib/discover/manager.go @@ -0,0 +1,297 @@ +// Copyright (C) 2020 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package discover + +import ( + "context" + "crypto/tls" + "fmt" + "sort" + "time" + + "github.com/thejerf/suture" + + "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/events" + "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/sync" + "github.com/syncthing/syncthing/lib/util" +) + +// The Manager aggregates results from multiple Finders. Each Finder has +// an associated cache time and negative cache time. The cache time sets how +// long we cache and return successful lookup results, the negative cache +// time sets how long we refrain from asking about the same device ID after +// receiving a negative answer. The value of zero disables caching (positive +// or negative). +type Manager interface { + FinderService + ChildErrors() map[string]error +} + +type manager struct { + *suture.Supervisor + myID protocol.DeviceID + cfg config.Wrapper + cert tls.Certificate + evLogger events.Logger + addressLister AddressLister + + finders map[string]cachedFinder + mut sync.RWMutex +} + +func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate, evLogger events.Logger, lister AddressLister) Manager { + return &manager{ + Supervisor: suture.New("discover.Manager", suture.Spec{ + PassThroughPanics: true, + }), + myID: myID, + cfg: cfg, + cert: cert, + evLogger: evLogger, + addressLister: lister, + + finders: make(map[string]cachedFinder), + mut: sync.NewRWMutex(), + } +} + +func (m *manager) Serve() { + m.cfg.Subscribe(m) + defer m.cfg.Unsubscribe(m) + m.CommitConfiguration(config.Configuration{}, m.cfg.RawCopy()) + m.Supervisor.Serve() +} + +func (m *manager) addLocked(identity string, finder Finder, cacheTime, negCacheTime time.Duration) { + entry := cachedFinder{ + Finder: finder, + cacheTime: cacheTime, + negCacheTime: negCacheTime, + cache: newCache(), + token: nil, + } + if service, ok := finder.(suture.Service); ok { + token := m.Supervisor.Add(service) + entry.token = &token + } + m.finders[identity] = entry + l.Infoln("Using discovery mechanism:", identity) +} + +func (m *manager) removeLocked(identity string) { + entry, ok := m.finders[identity] + if !ok { + return + } + if entry.token != nil { + err := m.Supervisor.Remove(*entry.token) + if err != nil { + l.Warnf("removing discovery %s: %s", identity, err) + } + } + delete(m.finders, identity) + l.Infoln("Stopped using discovery mechanism: ", identity) +} + +// Lookup attempts to resolve the device ID using any of the added Finders, +// while obeying the cache settings. +func (m *manager) Lookup(ctx context.Context, deviceID protocol.DeviceID) (addresses []string, err error) { + m.mut.RLock() + for _, finder := range m.finders { + if cacheEntry, ok := finder.cache.Get(deviceID); ok { + // We have a cache entry. Lets see what it says. + + if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime { + // It's a positive, valid entry. Use it. + l.Debugln("cached discovery entry for", deviceID, "at", finder) + l.Debugln(" cache:", cacheEntry) + addresses = append(addresses, cacheEntry.Addresses...) + continue + } + + valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime + if !cacheEntry.found && valid { + // It's a negative, valid entry. We should not make another + // attempt right now. + l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil) + continue + } + + // It's expired. Ignore and continue. + } + + // Perform the actual lookup and cache the result. + if addrs, err := finder.Lookup(ctx, deviceID); err == nil { + l.Debugln("lookup for", deviceID, "at", finder) + l.Debugln(" addresses:", addrs) + addresses = append(addresses, addrs...) + finder.cache.Set(deviceID, CacheEntry{ + Addresses: addrs, + when: time.Now(), + found: len(addrs) > 0, + }) + } else { + // Lookup returned error, add a negative cache entry. + entry := CacheEntry{ + when: time.Now(), + found: false, + } + if err, ok := err.(cachedError); ok { + entry.validUntil = time.Now().Add(err.CacheFor()) + } + finder.cache.Set(deviceID, entry) + } + } + m.mut.RUnlock() + + addresses = util.UniqueTrimmedStrings(addresses) + sort.Strings(addresses) + + l.Debugln("lookup results for", deviceID) + l.Debugln(" addresses: ", addresses) + + return addresses, nil +} + +func (m *manager) String() string { + return "discovery cache" +} + +func (m *manager) Error() error { + return nil +} + +func (m *manager) ChildErrors() map[string]error { + children := make(map[string]error, len(m.finders)) + m.mut.RLock() + for _, f := range m.finders { + children[f.String()] = f.Error() + } + m.mut.RUnlock() + return children +} + +func (m *manager) Cache() map[protocol.DeviceID]CacheEntry { + // Res will be the "total" cache, i.e. the union of our cache and all our + // children's caches. + res := make(map[protocol.DeviceID]CacheEntry) + + m.mut.RLock() + for _, finder := range m.finders { + // Each finder[i] has a corresponding cache. Go through + // it and populate the total, appending any addresses and keeping + // the newest "when" time. We skip any negative cache finders. + for k, v := range finder.cache.Cache() { + if v.found { + cur := res[k] + if v.when.After(cur.when) { + cur.when = v.when + } + cur.Addresses = append(cur.Addresses, v.Addresses...) + res[k] = cur + } + } + + // Then ask the finder itself for its cache and do the same. If this + // finder is a global discovery client, it will have no cache. If it's + // a local discovery client, this will be its current state. + for k, v := range finder.Cache() { + if v.found { + cur := res[k] + if v.when.After(cur.when) { + cur.when = v.when + } + cur.Addresses = append(cur.Addresses, v.Addresses...) + res[k] = cur + } + } + } + m.mut.RUnlock() + + for k, v := range res { + v.Addresses = util.UniqueTrimmedStrings(v.Addresses) + res[k] = v + } + + return res +} + +func (m *manager) VerifyConfiguration(_, _ config.Configuration) error { + return nil +} + +func (m *manager) CommitConfiguration(_, to config.Configuration) (handled bool) { + m.mut.Lock() + defer m.mut.Unlock() + toIdentities := make(map[string]struct{}) + if to.Options.GlobalAnnEnabled { + for _, srv := range to.Options.GlobalDiscoveryServers() { + toIdentities[globalDiscoveryIdentity(srv)] = struct{}{} + } + } + + if to.Options.LocalAnnEnabled { + toIdentities[ipv4Identity(to.Options.LocalAnnPort)] = struct{}{} + toIdentities[ipv6Identity(to.Options.LocalAnnMCAddr)] = struct{}{} + } + + // Remove things that we're not expected to have. + for identity := range m.finders { + if _, ok := toIdentities[identity]; !ok { + m.removeLocked(identity) + } + } + + // Add things we don't have. + if to.Options.GlobalAnnEnabled { + for _, srv := range to.Options.GlobalDiscoveryServers() { + identity := globalDiscoveryIdentity(srv) + // Skip, if it's already running. + if _, ok := m.finders[identity]; ok { + continue + } + gd, err := NewGlobal(srv, m.cert, m.addressLister, m.evLogger) + if err != nil { + l.Warnln("Global discovery:", err) + continue + } + + // Each global discovery server gets its results cached for five + // minutes, and is not asked again for a minute when it's returned + // unsuccessfully. + m.addLocked(identity, gd, 5*time.Minute, time.Minute) + } + } + + if to.Options.LocalAnnEnabled { + // v4 broadcasts + v4Identity := ipv4Identity(to.Options.LocalAnnPort) + if _, ok := m.finders[v4Identity]; !ok { + bcd, err := NewLocal(m.myID, fmt.Sprintf(":%d", to.Options.LocalAnnPort), m.addressLister, m.evLogger) + if err != nil { + l.Warnln("IPv4 local discovery:", err) + } else { + m.addLocked(v4Identity, bcd, 0, 0) + } + } + + // v6 multicasts + v6Identity := ipv6Identity(to.Options.LocalAnnMCAddr) + if _, ok := m.finders[v6Identity]; !ok { + mcd, err := NewLocal(m.myID, to.Options.LocalAnnMCAddr, m.addressLister, m.evLogger) + if err != nil { + l.Warnln("IPv6 local discovery:", err) + } else { + m.addLocked(v6Identity, mcd, 0, 0) + } + } + } + + return true +} diff --git a/lib/model/model.go b/lib/model/model.go index 78167e31c..c1a839671 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -121,10 +121,9 @@ type model struct { evLogger events.Logger // constant or concurrency safe fields - finder *db.BlockFinder - progressEmitter *ProgressEmitter - shortID protocol.ShortID - cacheIgnoredFiles bool + finder *db.BlockFinder + progressEmitter *ProgressEmitter + shortID protocol.ShortID // globalRequestLimiter limits the amount of data in concurrent incoming // requests globalRequestLimiter *byteSemaphore @@ -202,7 +201,6 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio finder: db.NewBlockFinder(ldb), progressEmitter: NewProgressEmitter(cfg, evLogger), shortID: id.Short(), - cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles, globalRequestLimiter: newByteSemaphore(1024 * cfg.Options().MaxConcurrentIncomingRequestKiB()), folderIOLimiter: newByteSemaphore(cfg.Options().MaxFolderConcurrency()), @@ -245,12 +243,13 @@ func (m *model) ServeBackground() { func (m *model) onServe() { // Add and start folders + cacheIgnoredFiles := m.cfg.Options().CacheIgnoredFiles for _, folderCfg := range m.cfg.Folders() { if folderCfg.Paused { folderCfg.CreateRoot() continue } - m.newFolder(folderCfg) + m.newFolder(folderCfg, cacheIgnoredFiles) } m.cfg.Subscribe(m) } @@ -278,8 +277,8 @@ func (m *model) StartDeadlockDetector(timeout time.Duration) { } // Need to hold lock on m.fmut when calling this. -func (m *model) addAndStartFolderLocked(cfg config.FolderConfiguration, fset *db.FileSet) { - ignores := ignore.New(cfg.Filesystem(), ignore.WithCache(m.cacheIgnoredFiles)) +func (m *model) addAndStartFolderLocked(cfg config.FolderConfiguration, fset *db.FileSet, cacheIgnoredFiles bool) { + ignores := ignore.New(cfg.Filesystem(), ignore.WithCache(cacheIgnoredFiles)) if err := ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) { l.Warnln("Loading ignores:", err) } @@ -445,7 +444,7 @@ func (m *model) cleanupFolderLocked(cfg config.FolderConfiguration) { delete(m.folderVersioners, cfg.ID) } -func (m *model) restartFolder(from, to config.FolderConfiguration) { +func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredFiles bool) { if len(to.ID) == 0 { panic("bug: cannot restart empty folder ID") } @@ -495,12 +494,12 @@ func (m *model) restartFolder(from, to config.FolderConfiguration) { m.cleanupFolderLocked(from) if !to.Paused { - m.addAndStartFolderLocked(to, fset) + m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles) } l.Infof("%v folder %v (%v)", infoMsg, to.Description(), to.Type) } -func (m *model) newFolder(cfg config.FolderConfiguration) { +func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool) { // Creating the fileset can take a long time (metadata calculation) so // we do it outside of the lock. fset := db.NewFileSet(cfg.ID, cfg.Filesystem(), m.db) @@ -510,7 +509,7 @@ func (m *model) newFolder(cfg config.FolderConfiguration) { m.fmut.Lock() defer m.fmut.Unlock() - m.addAndStartFolderLocked(cfg, fset) + m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles) } func (m *model) UsageReportingStats(report *contract.Report, version int, preview bool) { @@ -2471,7 +2470,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { l.Infoln("Paused folder", cfg.Description()) } else { l.Infoln("Adding folder", cfg.Description()) - m.newFolder(cfg) + m.newFolder(cfg, to.Options.CacheIgnoredFiles) } } } @@ -2490,8 +2489,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { // This folder exists on both sides. Settings might have changed. // Check if anything differs that requires a restart. - if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) { - m.restartFolder(fromCfg, toCfg) + if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) || from.Options.CacheIgnoredFiles != to.Options.CacheIgnoredFiles { + m.restartFolder(fromCfg, toCfg, to.Options.CacheIgnoredFiles) } // Emit the folder pause/resume event diff --git a/lib/model/model_test.go b/lib/model/model_test.go index f6df87692..bedc65aba 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -1475,7 +1475,7 @@ func TestIgnores(t *testing.T) { // Invalid path, marker should be missing, hence returns an error. fcfg := config.FolderConfiguration{ID: "fresh", Path: "XXX"} - ignores := ignore.New(fcfg.Filesystem(), ignore.WithCache(m.cacheIgnoredFiles)) + ignores := ignore.New(fcfg.Filesystem(), ignore.WithCache(m.cfg.Options().CacheIgnoredFiles)) m.fmut.Lock() m.folderCfgs[fcfg.ID] = fcfg m.folderIgnores[fcfg.ID] = ignores @@ -1490,7 +1490,7 @@ func TestIgnores(t *testing.T) { pausedDefaultFolderConfig := defaultFolderConfig pausedDefaultFolderConfig.Paused = true - m.restartFolder(defaultFolderConfig, pausedDefaultFolderConfig) + m.restartFolder(defaultFolderConfig, pausedDefaultFolderConfig, false) // Here folder initialization is not an issue as a paused folder isn't // added to the model and thus there is no initial scan happening. @@ -2290,7 +2290,7 @@ func TestIndexesForUnknownDevicesDropped(t *testing.T) { } m := newModel(defaultCfgWrapper, myID, "syncthing", "dev", dbi, nil) - m.newFolder(defaultFolderConfig) + m.newFolder(defaultFolderConfig, false) defer cleanupModel(m) // Remote sequence is cached, hence need to recreated. @@ -3343,7 +3343,7 @@ func TestConnCloseOnRestart(t *testing.T) { newFcfg.Paused = true done := make(chan struct{}) go func() { - m.restartFolder(fcfg, newFcfg) + m.restartFolder(fcfg, newFcfg, false) close(done) }() select { diff --git a/lib/syncthing/syncthing.go b/lib/syncthing/syncthing.go index 6b7615abc..6e8518b43 100644 --- a/lib/syncthing/syncthing.go +++ b/lib/syncthing/syncthing.go @@ -264,11 +264,6 @@ func (a *App) startup() error { a.mainService.Add(m) - // Start discovery - - cachedDiscovery := discover.NewCachingMux() - a.mainService.Add(cachedDiscovery) - // The TLS configuration is used for both the listening socket and outgoing // connections. @@ -279,44 +274,21 @@ func (a *App) startup() error { tlsCfg.SessionTicketsDisabled = true tlsCfg.InsecureSkipVerify = true - // Start connection management + // Start discovery and connection management - connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, cachedDiscovery, bepProtocolName, tlsDefaultCommonName, a.evLogger) + // Chicken and egg, discovery manager depends on connection service to tell it what addresses it's listening on + // Connection service depends on discovery manager to get addresses to connect to. + // Create a wrapper that is then wired after they are both setup. + addrLister := &lateAddressLister{} + + discoveryManager := discover.NewManager(a.myID, a.cfg, a.cert, a.evLogger, addrLister) + connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, discoveryManager, bepProtocolName, tlsDefaultCommonName, a.evLogger) + + addrLister.AddressLister = connectionsService + + a.mainService.Add(discoveryManager) a.mainService.Add(connectionsService) - if a.cfg.Options().GlobalAnnEnabled { - for _, srv := range a.cfg.Options().GlobalDiscoveryServers() { - l.Infoln("Using discovery server", srv) - gd, err := discover.NewGlobal(srv, a.cert, connectionsService, a.evLogger) - if err != nil { - l.Warnln("Global discovery:", err) - continue - } - - // Each global discovery server gets its results cached for five - // minutes, and is not asked again for a minute when it's returned - // unsuccessfully. - cachedDiscovery.Add(gd, 5*time.Minute, time.Minute) - } - } - - if a.cfg.Options().LocalAnnEnabled { - // v4 broadcasts - bcd, err := discover.NewLocal(a.myID, fmt.Sprintf(":%d", a.cfg.Options().LocalAnnPort), connectionsService, a.evLogger) - if err != nil { - l.Warnln("IPv4 local discovery:", err) - } else { - cachedDiscovery.Add(bcd, 0, 0) - } - // v6 multicasts - mcd, err := discover.NewLocal(a.myID, a.cfg.Options().LocalAnnMCAddr, connectionsService, a.evLogger) - if err != nil { - l.Warnln("IPv6 local discovery:", err) - } else { - cachedDiscovery.Add(mcd, 0, 0) - } - } - // Candidate builds always run with usage reporting. if opts := a.cfg.Options(); build.IsCandidate { @@ -341,7 +313,7 @@ func (a *App) startup() error { // GUI - if err := a.setupGUI(m, defaultSub, diskSub, cachedDiscovery, connectionsService, usageReportingSvc, errors, systemLog); err != nil { + if err := a.setupGUI(m, defaultSub, diskSub, discoveryManager, connectionsService, usageReportingSvc, errors, systemLog); err != nil { l.Warnln("Failed starting API:", err) return err } @@ -430,7 +402,7 @@ func (a *App) stopWithErr(stopReason ExitStatus, err error) ExitStatus { return a.exitStatus } -func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder) error { +func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder) error { guiCfg := a.cfg.GUI() if !guiCfg.Enabled { @@ -516,3 +488,7 @@ func printService(w io.Writer, svc interface{}, level int) { } } } + +type lateAddressLister struct { + discover.AddressLister +} diff --git a/lib/ur/usage_report.go b/lib/ur/usage_report.go index 3fbf794f0..30be1189c 100644 --- a/lib/ur/usage_report.go +++ b/lib/ur/usage_report.go @@ -206,8 +206,8 @@ func (s *Service) reportData(ctx context.Context, urVersion int, preview bool) ( report.UsesRateLimit = opts.MaxRecvKbps > 0 || opts.MaxSendKbps > 0 report.UpgradeAllowedManual = !(upgrade.DisabledByCompilation || s.noUpgrade) - report.UpgradeAllowedAuto = !(upgrade.DisabledByCompilation || s.noUpgrade) && opts.AutoUpgradeIntervalH > 0 - report.UpgradeAllowedPre = !(upgrade.DisabledByCompilation || s.noUpgrade) && opts.AutoUpgradeIntervalH > 0 && opts.UpgradeToPreReleases + report.UpgradeAllowedAuto = !(upgrade.DisabledByCompilation || s.noUpgrade) && opts.AutoUpgradeEnabled() + report.UpgradeAllowedPre = !(upgrade.DisabledByCompilation || s.noUpgrade) && opts.AutoUpgradeEnabled() && opts.UpgradeToPreReleases // V3