From 5c91723ef2878eb66ebf3695e8b2b41b52f421ad Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Wed, 21 Oct 2020 11:51:53 +0200 Subject: [PATCH] lib/model: Handle index sender lifetime (fixes #7034) (#7038) --- lib/model/indexsender.go | 403 ++++++++++++++++++++++++++++++++++++ lib/model/model.go | 356 ++++++++----------------------- lib/model/model_test.go | 59 ++---- lib/model/requests_test.go | 119 +++++++++++ lib/model/testutils_test.go | 38 ++++ 5 files changed, 668 insertions(+), 307 deletions(-) create mode 100644 lib/model/indexsender.go diff --git a/lib/model/indexsender.go b/lib/model/indexsender.go new file mode 100644 index 000000000..7f66d7a2f --- /dev/null +++ b/lib/model/indexsender.go @@ -0,0 +1,403 @@ +// Copyright (C) 2020 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package model + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/thejerf/suture" + + "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/db" + "github.com/syncthing/syncthing/lib/events" + "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/util" +) + +type indexSender struct { + suture.Service + conn protocol.Connection + folder string + dev string + fset *db.FileSet + prevSequence int64 + evLogger events.Logger + connClosed chan struct{} + token suture.ServiceToken + pauseChan chan struct{} + resumeChan chan *db.FileSet +} + +func (s *indexSender) serve(ctx context.Context) { + var err error + + l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence) + defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.dev, s.conn, err) + + // We need to send one index, regardless of whether there is something to send or not + err = s.sendIndexTo(ctx) + + // Subscribe to LocalIndexUpdated (we have new information to send) and + // DeviceDisconnected (it might be us who disconnected, so we should + // exit). + sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected) + defer sub.Unsubscribe() + + paused := false + evChan := sub.C() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for err == nil { + select { + case <-ctx.Done(): + return + case <-s.connClosed: + return + default: + } + + // While we have sent a sequence at least equal to the one + // currently in the database, wait for the local index to update. The + // local index may update for other folders than the one we are + // sending for. + if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { + select { + case <-ctx.Done(): + return + case <-s.connClosed: + return + case <-evChan: + case <-ticker.C: + case <-s.pauseChan: + paused = true + case s.fset = <-s.resumeChan: + paused = false + } + + continue + } + + if !paused { + err = s.sendIndexTo(ctx) + } + + // Wait a short amount of time before entering the next loop. If there + // are continuous changes happening to the local index, this gives us + // time to batch them up a little. + time.Sleep(250 * time.Millisecond) + } +} + +// Complete implements the suture.IsCompletable interface. When Serve terminates +// before Stop is called, the supervisor will check for this method and if it +// returns true removes the service instead of restarting it. Here it always +// returns true, as indexSender only terminates when a connection is +// closed/has failed, in which case retrying doesn't help. +func (s *indexSender) Complete() bool { return true } + +func (s *indexSender) resume(fset *db.FileSet) { + select { + case <-s.connClosed: + case s.resumeChan <- fset: + } +} + +func (s *indexSender) pause() { + select { + case <-s.connClosed: + case s.pauseChan <- struct{}{}: + } +} + +// sendIndexTo sends file infos with a sequence number higher than prevSequence and +// returns the highest sent sequence number. +func (s *indexSender) sendIndexTo(ctx context.Context) error { + initial := s.prevSequence == 0 + batch := newFileInfoBatch(nil) + batch.flushFn = func(fs []protocol.FileInfo) error { + l.Debugf("%v: Sending %d files (<%d bytes)", s, len(batch.infos), batch.size) + if initial { + initial = false + return s.conn.Index(ctx, s.folder, fs) + } + return s.conn.IndexUpdate(ctx, s.folder, fs) + } + + var err error + var f protocol.FileInfo + snap := s.fset.Snapshot() + defer snap.Release() + previousWasDelete := false + snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool { + // This is to make sure that renames (which is an add followed by a delete) land in the same batch. + // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that + // the batch ends with a non-delete, or that the last item in the batch is already a delete + if batch.full() && (!fi.IsDeleted() || previousWasDelete) { + if err = batch.flush(); err != nil { + return false + } + } + + if shouldDebug() { + if fi.SequenceNo() < s.prevSequence+1 { + panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1)) + } + } + + if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence { + l.Warnln("Non-increasing sequence detected: Checking and repairing the db...") + // Abort this round of index sending - the next one will pick + // up from the last successful one with the repeaired db. + defer func() { + if fixed, dbErr := s.fset.RepairSequence(); dbErr != nil { + l.Warnln("Failed repairing sequence entries:", dbErr) + panic("Failed repairing sequence entries") + } else { + s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence") + l.Infof("Repaired %v sequence entries in database", fixed) + } + }() + return false + } + + f = fi.(protocol.FileInfo) + + // Mark the file as invalid if any of the local bad stuff flags are set. + f.RawInvalid = f.IsInvalid() + // If the file is marked LocalReceive (i.e., changed locally on a + // receive only folder) we do not want it to ever become the + // globally best version, invalid or not. + if f.IsReceiveOnlyChanged() { + f.Version = protocol.Vector{} + } + + // never sent externally + f.LocalFlags = 0 + f.VersionHash = nil + + previousWasDelete = f.IsDeleted() + + batch.append(f) + return true + }) + if err != nil { + return err + } + + err = batch.flush() + + // True if there was nothing to be sent + if f.Sequence == 0 { + return err + } + + s.prevSequence = f.Sequence + return err +} + +func (s *indexSender) String() string { + return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.dev, s.conn) +} + +type indexSenderRegistry struct { + deviceID protocol.DeviceID + sup *suture.Supervisor + evLogger events.Logger + conn protocol.Connection + closed chan struct{} + indexSenders map[string]*indexSender + startInfos map[string]*indexSenderStartInfo + mut sync.Mutex +} + +func newIndexSenderRegistry(conn protocol.Connection, closed chan struct{}, sup *suture.Supervisor, evLogger events.Logger) *indexSenderRegistry { + return &indexSenderRegistry{ + deviceID: conn.ID(), + conn: conn, + closed: closed, + sup: sup, + evLogger: evLogger, + indexSenders: make(map[string]*indexSender), + startInfos: make(map[string]*indexSenderStartInfo), + mut: sync.Mutex{}, + } +} + +// add starts an index sender for given folder. +// If an index sender is already running, it will be stopped first. +func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.FileSet, local, remote protocol.Device) { + r.mut.Lock() + r.addLocked(folder, fset, remote, local) + r.mut.Unlock() +} + +func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, local, remote protocol.Device) { + if is, ok := r.indexSenders[folder.ID]; ok { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexSenders, folder.ID) + } + if _, ok := r.startInfos[folder.ID]; ok { + delete(r.startInfos, folder.ID) + } + + myIndexID := fset.IndexID(protocol.LocalDeviceID) + mySequence := fset.Sequence(protocol.LocalDeviceID) + var startSequence int64 + + // This is the other side's description of what it knows + // about us. Lets check to see if we can start sending index + // updates directly or need to send the index from start... + + if local.IndexID == myIndexID { + // They say they've seen our index ID before, so we can + // send a delta update only. + + if local.MaxSequence > mySequence { + // Safety check. They claim to have more or newer + // index data than we have - either we have lost + // index data, or reset the index without resetting + // the IndexID, or something else weird has + // happened. We send a full index to reset the + // situation. + l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", r.deviceID, folder.Description()) + startSequence = 0 + } else { + l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", r.deviceID, folder.Description(), local.MaxSequence) + startSequence = local.MaxSequence + } + } else if local.IndexID != 0 { + // They say they've seen an index ID from us, but it's + // not the right one. Either they are confused or we + // must have reset our database since last talking to + // them. We'll start with a full index transfer. + l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", r.deviceID, folder.Description(), local.IndexID, myIndexID) + startSequence = 0 + } + + // This is the other side's description of themselves. We + // check to see that it matches the IndexID we have on file, + // otherwise we drop our old index data and expect to get a + // completely new set. + + theirIndexID := fset.IndexID(r.deviceID) + if remote.IndexID == 0 { + // They're not announcing an index ID. This means they + // do not support delta indexes and we should clear any + // information we have from them before accepting their + // index, which will presumably be a full index. + fset.Drop(r.deviceID) + } else if remote.IndexID != theirIndexID { + // The index ID we have on file is not what they're + // announcing. They must have reset their database and + // will probably send us a full index. We drop any + // information we have and remember this new index ID + // instead. + l.Infof("Device %v folder %s has a new index ID (%v)", r.deviceID, folder.Description(), remote.IndexID) + fset.Drop(r.deviceID) + fset.SetIndexID(r.deviceID, remote.IndexID) + } + + is := &indexSender{ + conn: r.conn, + connClosed: r.closed, + folder: folder.ID, + fset: fset, + prevSequence: startSequence, + evLogger: r.evLogger, + pauseChan: make(chan struct{}), + resumeChan: make(chan *db.FileSet), + } + is.Service = util.AsService(is.serve, is.String()) + is.token = r.sup.Add(is) + r.indexSenders[folder.ID] = is +} + +// addPaused stores the given info to start an index sender once resume is called +// for this folder. +// If an index sender is already running, it will be stopped. +func (r *indexSenderRegistry) addPaused(folder config.FolderConfiguration, local, remote protocol.Device) { + r.mut.Lock() + defer r.mut.Unlock() + + if is, ok := r.indexSenders[folder.ID]; ok { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexSenders, folder.ID) + } + r.startInfos[folder.ID] = &indexSenderStartInfo{local, remote} +} + +// remove stops a running index sender or removes one pending to be started. +// It is a noop if the folder isn't known. +func (r *indexSenderRegistry) remove(folder string) { + r.mut.Lock() + defer r.mut.Unlock() + + if is, ok := r.indexSenders[folder]; ok { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexSenders, folder) + } + delete(r.startInfos, folder) +} + +// removeAllExcept stops all running index senders and removes those pending to be started, +// except mentioned ones. +// It is a noop if the folder isn't known. +func (r *indexSenderRegistry) removeAllExcept(except map[string]struct{}) { + r.mut.Lock() + defer r.mut.Unlock() + + for folder, is := range r.indexSenders { + if _, ok := except[folder]; !ok { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexSenders, folder) + } + } + for folder := range r.indexSenders { + if _, ok := except[folder]; !ok { + delete(r.startInfos, folder) + } + } +} + +// pause stops a running index sender. +// It is a noop if the folder isn't known or has not been started yet. +func (r *indexSenderRegistry) pause(folder string) { + r.mut.Lock() + defer r.mut.Unlock() + + if is, ok := r.indexSenders[folder]; ok { + is.pause() + } +} + +// resume unpauses an already running index sender or starts it, if it was added +// while paused. +// It is a noop if the folder isn't known. +func (r *indexSenderRegistry) resume(folder config.FolderConfiguration, fset *db.FileSet) { + r.mut.Lock() + defer r.mut.Unlock() + + is, isOk := r.indexSenders[folder.ID] + if info, ok := r.startInfos[folder.ID]; ok { + if isOk { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexSenders, folder.ID) + } + r.addLocked(folder, fset, info.local, info.remote) + delete(r.startInfos, folder.ID) + } else if isOk { + is.resume(fset) + } +} + +type indexSenderStartInfo struct { + local, remote protocol.Device +} diff --git a/lib/model/model.go b/lib/model/model.go index 06c06ee89..6429dadb9 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -35,7 +35,6 @@ import ( "github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/ur/contract" - "github.com/syncthing/syncthing/lib/util" "github.com/syncthing/syncthing/lib/versioner" ) @@ -149,8 +148,8 @@ type model struct { closed map[protocol.DeviceID]chan struct{} helloMessages map[protocol.DeviceID]protocol.Hello deviceDownloads map[protocol.DeviceID]*deviceDownloadState - remotePausedFolders map[protocol.DeviceID][]string // deviceID -> folders - indexSenderTokens map[protocol.DeviceID][]suture.ServiceToken + remotePausedFolders map[protocol.DeviceID]map[string]struct{} // deviceID -> folders + indexSenders map[protocol.DeviceID]*indexSenderRegistry foldersRunning int32 // for testing only } @@ -172,9 +171,11 @@ var ( errNetworkNotAllowed = errors.New("network not allowed") errNoVersioner = errors.New("folder has no versioner") // errors about why a connection is closed - errIgnoredFolderRemoved = errors.New("folder no longer ignored") - errReplacingConnection = errors.New("replacing connection") - errStopped = errors.New("Syncthing is being stopped") + errIgnoredFolderRemoved = errors.New("folder no longer ignored") + errReplacingConnection = errors.New("replacing connection") + errStopped = errors.New("Syncthing is being stopped") + errMissingRemoteInClusterConfig = errors.New("remote device missing in cluster config") + errMissingLocalInClusterConfig = errors.New("local device missing in cluster config") ) // NewModel creates and starts a new model. The model starts in read-only mode, @@ -222,8 +223,8 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio closed: make(map[protocol.DeviceID]chan struct{}), helloMessages: make(map[protocol.DeviceID]protocol.Hello), deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), - remotePausedFolders: make(map[protocol.DeviceID][]string), - indexSenderTokens: make(map[protocol.DeviceID][]suture.ServiceToken), + remotePausedFolders: make(map[protocol.DeviceID]map[string]struct{}), + indexSenders: make(map[protocol.DeviceID]*indexSenderRegistry), } for devID := range cfg.Devices() { m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID.String()) @@ -405,7 +406,10 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) { m.RemoveAndWait(token, 0) } + // We need to hold both fmut and pmut and must acquire locks in the same + // order always. (The locks can be *released* in any order.) m.fmut.Lock() + m.pmut.RLock() isPathUnique := true for folderID, folderCfg := range m.folderCfgs { @@ -420,8 +424,12 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) { } m.cleanupFolderLocked(cfg) + for _, r := range m.indexSenders { + r.remove(cfg.ID) + } m.fmut.Unlock() + m.pmut.RUnlock() // Remove it from the database db.DropFolder(m.db, cfg.ID) @@ -472,14 +480,32 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF fset := m.folderFiles[folder] m.cleanupFolderLocked(from) - if !to.Paused { - if fset == nil { + if to.Paused { + // Care needs to be taken because we already hold fmut and the lock order + // must be the same everywhere. As fmut is acquired first, this is fine. + m.pmut.RLock() + for _, r := range m.indexSenders { + r.pause(to.ID) + } + m.pmut.RUnlock() + } else { + fsetNil := fset == nil + if fsetNil { // Create a new fset. Might take a while and we do it under // locking, but it's unsafe to create fset:s concurrently so // that's the price we pay. fset = db.NewFileSet(folder, to.Filesystem(), m.db) } m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles) + if fsetNil || from.Paused { + for _, devID := range to.DeviceIDs() { + indexSenders, ok := m.indexSenders[devID] + if !ok { + continue + } + indexSenders.resume(to, fset) + } + } } var infoMsg string @@ -979,11 +1005,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon tempIndexFolders := make([]string, 0, len(cm.Folders)) m.pmut.RLock() - conn, ok := m.conn[deviceID] - closed := m.closed[deviceID] - for _, token := range m.indexSenderTokens[deviceID] { - m.RemoveAndWait(token, 0) - } + indexSenderRegistry, ok := m.indexSenders[deviceID] m.pmut.RUnlock() if !ok { panic("bug: ClusterConfig called on closed or nonexistent connection") @@ -1015,11 +1037,14 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon } } - var paused []string - indexSenderTokens := make([]suture.ServiceToken, 0, len(cm.Folders)) + paused := make(map[string]struct{}, len(cm.Folders)) + seenFolders := make(map[string]struct{}, len(cm.Folders)) for _, folder := range cm.Folders { + seenFolders[folder.ID] = struct{}{} + cfg, ok := m.cfg.Folder(folder.ID) if !ok || !cfg.SharedWith(deviceID) { + indexSenderRegistry.remove(folder.ID) if deviceCfg.IgnoredFolder(folder.ID) { l.Infof("Ignoring folder %s from device %s since we are configured to", folder.Description(), deviceID) continue @@ -1034,13 +1059,41 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon l.Infof("Unexpected folder %s sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder.Description(), deviceID) continue } + + var foundRemote, foundLocal bool + var remoteDeviceInfo, localDeviceInfo protocol.Device + for _, dev := range folder.Devices { + if dev.ID == m.id { + localDeviceInfo = dev + foundLocal = true + } else if dev.ID == deviceID { + remoteDeviceInfo = dev + foundRemote = true + } + if foundRemote && foundLocal { + break + } + } + if !foundRemote { + l.Infof("Device %v sent cluster-config without the device info for the remote on folder %v", deviceID, folder.Description()) + return errMissingRemoteInClusterConfig + } + if !foundLocal { + l.Infof("Device %v sent cluster-config without the device info for us locally on folder %v", deviceID, folder.Description()) + return errMissingLocalInClusterConfig + } + if folder.Paused { - paused = append(paused, folder.ID) + indexSenderRegistry.remove(folder.ID) + paused[cfg.ID] = struct{}{} continue } + if cfg.Paused { + indexSenderRegistry.addPaused(cfg, localDeviceInfo, remoteDeviceInfo) continue } + m.fmut.RLock() fs, ok := m.folderFiles[folder.ID] m.fmut.RUnlock() @@ -1054,93 +1107,21 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon tempIndexFolders = append(tempIndexFolders, folder.ID) } - myIndexID := fs.IndexID(protocol.LocalDeviceID) - mySequence := fs.Sequence(protocol.LocalDeviceID) - var startSequence int64 + indexSenderRegistry.add(cfg, fs, localDeviceInfo, remoteDeviceInfo) - for _, dev := range folder.Devices { - if dev.ID == m.id { - // This is the other side's description of what it knows - // about us. Lets check to see if we can start sending index - // updates directly or need to send the index from start... - - if dev.IndexID == myIndexID { - // They say they've seen our index ID before, so we can - // send a delta update only. - - if dev.MaxSequence > mySequence { - // Safety check. They claim to have more or newer - // index data than we have - either we have lost - // index data, or reset the index without resetting - // the IndexID, or something else weird has - // happened. We send a full index to reset the - // situation. - l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", deviceID, folder.Description()) - startSequence = 0 - continue - } - - l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", deviceID, folder.Description(), dev.MaxSequence) - startSequence = dev.MaxSequence - } else if dev.IndexID != 0 { - // They say they've seen an index ID from us, but it's - // not the right one. Either they are confused or we - // must have reset our database since last talking to - // them. We'll start with a full index transfer. - l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", deviceID, folder.Description(), dev.IndexID, myIndexID) - startSequence = 0 - } - } else if dev.ID == deviceID { - // This is the other side's description of themselves. We - // check to see that it matches the IndexID we have on file, - // otherwise we drop our old index data and expect to get a - // completely new set. - - theirIndexID := fs.IndexID(deviceID) - if dev.IndexID == 0 { - // They're not announcing an index ID. This means they - // do not support delta indexes and we should clear any - // information we have from them before accepting their - // index, which will presumably be a full index. - fs.Drop(deviceID) - } else if dev.IndexID != theirIndexID { - // The index ID we have on file is not what they're - // announcing. They must have reset their database and - // will probably send us a full index. We drop any - // information we have and remember this new index ID - // instead. - l.Infof("Device %v folder %s has a new index ID (%v)", deviceID, folder.Description(), dev.IndexID) - fs.Drop(deviceID) - fs.SetIndexID(deviceID, dev.IndexID) - } else { - // They're sending a recognized index ID and will most - // likely use delta indexes. We might already have files - // that we need to pull so let the folder runner know - // that it should recheck the index data. - m.fmut.RLock() - if runner := m.folderRunners[folder.ID]; runner != nil { - defer runner.SchedulePull() - } - m.fmut.RUnlock() - } - } + // We might already have files that we need to pull so let the + // folder runner know that it should recheck the index data. + m.fmut.RLock() + if runner := m.folderRunners[folder.ID]; runner != nil { + defer runner.SchedulePull() } - - is := &indexSender{ - conn: conn, - connClosed: closed, - folder: folder.ID, - fset: fs, - prevSequence: startSequence, - evLogger: m.evLogger, - } - is.Service = util.AsService(is.serve, is.String()) - indexSenderTokens = append(indexSenderTokens, m.Add(is)) + m.fmut.RUnlock() } + indexSenderRegistry.removeAllExcept(seenFolders) + m.pmut.Lock() m.remotePausedFolders[deviceID] = paused - m.indexSenderTokens[deviceID] = indexSenderTokens m.pmut.Unlock() // This breaks if we send multiple CM messages during the same connection. @@ -1376,6 +1357,7 @@ func (m *model) Closed(conn protocol.Connection, err error) { delete(m.remotePausedFolders, device) closed := m.closed[device] delete(m.closed, device) + delete(m.indexSenders, device) m.pmut.Unlock() m.progressEmitter.temporaryIndexUnsubscribe(conn) @@ -1779,8 +1761,10 @@ func (m *model) AddConnection(conn connections.Connection, hello protocol.Hello) } m.conn[deviceID] = conn - m.closed[deviceID] = make(chan struct{}) + closed := make(chan struct{}) + m.closed[deviceID] = closed m.deviceDownloads[deviceID] = newDeviceDownloadState() + m.indexSenders[deviceID] = newIndexSenderRegistry(conn, closed, m.Supervisor, m.evLogger) // 0: default, <0: no limiting switch { case device.MaxRequestKiB > 0: @@ -1857,168 +1841,6 @@ func (m *model) deviceWasSeen(deviceID protocol.DeviceID) { } } -type indexSender struct { - suture.Service - conn protocol.Connection - folder string - dev string - fset *db.FileSet - prevSequence int64 - evLogger events.Logger - connClosed chan struct{} -} - -func (s *indexSender) serve(ctx context.Context) { - var err error - - l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence) - defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.dev, s.conn, err) - - // We need to send one index, regardless of whether there is something to send or not - err = s.sendIndexTo(ctx) - - // Subscribe to LocalIndexUpdated (we have new information to send) and - // DeviceDisconnected (it might be us who disconnected, so we should - // exit). - sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected) - defer sub.Unsubscribe() - - evChan := sub.C() - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - for err == nil { - select { - case <-ctx.Done(): - return - case <-s.connClosed: - return - default: - } - - // While we have sent a sequence at least equal to the one - // currently in the database, wait for the local index to update. The - // local index may update for other folders than the one we are - // sending for. - if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { - select { - case <-ctx.Done(): - return - case <-s.connClosed: - return - case <-evChan: - case <-ticker.C: - } - - continue - } - - err = s.sendIndexTo(ctx) - - // Wait a short amount of time before entering the next loop. If there - // are continuous changes happening to the local index, this gives us - // time to batch them up a little. - time.Sleep(250 * time.Millisecond) - } -} - -// Complete implements the suture.IsCompletable interface. When Serve terminates -// before Stop is called, the supervisor will check for this method and if it -// returns true removes the service instead of restarting it. Here it always -// returns true, as indexSender only terminates when a connection is -// closed/has failed, in which case retrying doesn't help. -func (s *indexSender) Complete() bool { return true } - -// sendIndexTo sends file infos with a sequence number higher than prevSequence and -// returns the highest sent sequence number. -func (s *indexSender) sendIndexTo(ctx context.Context) error { - initial := s.prevSequence == 0 - batch := newFileInfoBatch(nil) - batch.flushFn = func(fs []protocol.FileInfo) error { - l.Debugf("%v: Sending %d files (<%d bytes)", s, len(batch.infos), batch.size) - if initial { - initial = false - return s.conn.Index(ctx, s.folder, fs) - } - return s.conn.IndexUpdate(ctx, s.folder, fs) - } - - var err error - var f protocol.FileInfo - snap := s.fset.Snapshot() - defer snap.Release() - previousWasDelete := false - snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool { - // This is to make sure that renames (which is an add followed by a delete) land in the same batch. - // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that - // the batch ends with a non-delete, or that the last item in the batch is already a delete - if batch.full() && (!fi.IsDeleted() || previousWasDelete) { - if err = batch.flush(); err != nil { - return false - } - } - - if shouldDebug() { - if fi.SequenceNo() < s.prevSequence+1 { - panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1)) - } - } - - if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence { - l.Warnln("Non-increasing sequence detected: Checking and repairing the db...") - // Abort this round of index sending - the next one will pick - // up from the last successful one with the repeaired db. - defer func() { - if fixed, dbErr := s.fset.RepairSequence(); dbErr != nil { - l.Warnln("Failed repairing sequence entries:", dbErr) - panic("Failed repairing sequence entries") - } else { - s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence") - l.Infof("Repaired %v sequence entries in database", fixed) - } - }() - return false - } - - f = fi.(protocol.FileInfo) - - // Mark the file as invalid if any of the local bad stuff flags are set. - f.RawInvalid = f.IsInvalid() - // If the file is marked LocalReceive (i.e., changed locally on a - // receive only folder) we do not want it to ever become the - // globally best version, invalid or not. - if f.IsReceiveOnlyChanged() { - f.Version = protocol.Vector{} - } - - // never sent externally - f.LocalFlags = 0 - f.VersionHash = nil - - previousWasDelete = f.IsDeleted() - - batch.append(f) - return true - }) - if err != nil { - return err - } - - err = batch.flush() - - // True if there was nothing to be sent - if f.Sequence == 0 { - return err - } - - s.prevSequence = f.Sequence - return err -} - -func (s *indexSender) String() string { - return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.dev, s.conn) -} - func (m *model) requestGlobal(ctx context.Context, deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { m.pmut.RLock() nc, ok := m.conn[deviceID] @@ -2373,12 +2195,12 @@ func (m *model) Availability(folder string, file protocol.FileInfo, block protoc var availabilities []Availability snap := fs.Snapshot() defer snap.Release() -next: for _, device := range snap.Availability(file.Name) { - for _, pausedFolder := range m.remotePausedFolders[device] { - if pausedFolder == folder { - continue next - } + if _, ok := m.remotePausedFolders[device]; !ok { + continue + } + if _, ok := m.remotePausedFolders[device][folder]; ok { + continue } _, ok := m.conn[device] if ok { diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 25d33f42d..8458d6a20 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -496,23 +496,16 @@ func TestIntroducer(t *testing.T) { }, }, }) - m.ClusterConfig(device1, protocol.ClusterConfig{ - Folders: []protocol.Folder{ - { - ID: "folder1", - Devices: []protocol.Device{ - { - ID: device2, - Introducer: true, - SkipIntroductionRemovals: true, - }, - }, - }, - }, + cc := basicClusterConfig(myID, device1, "folder1") + cc.Folders[0].Devices = append(cc.Folders[0].Devices, protocol.Device{ + ID: device2, + Introducer: true, + SkipIntroductionRemovals: true, }) + m.ClusterConfig(device1, cc) if newDev, ok := m.cfg.Device(device2); !ok || !newDev.Introducer || !newDev.SkipIntroductionRemovals { - t.Error("devie 2 missing or wrong flags") + t.Error("device 2 missing or wrong flags") } if !contains(m.cfg.Folders()["folder1"], device2, device1) { @@ -549,20 +542,13 @@ func TestIntroducer(t *testing.T) { }, }, }) - m.ClusterConfig(device1, protocol.ClusterConfig{ - Folders: []protocol.Folder{ - { - ID: "folder2", - Devices: []protocol.Device{ - { - ID: device2, - Introducer: true, - SkipIntroductionRemovals: true, - }, - }, - }, - }, + cc = basicClusterConfig(myID, device1, "folder2") + cc.Folders[0].Devices = append(cc.Folders[0].Devices, protocol.Device{ + ID: device2, + Introducer: true, + SkipIntroductionRemovals: true, }) + m.ClusterConfig(device1, cc) // Should not get introducer, as it's already unset, and it's an existing device. if newDev, ok := m.cfg.Device(device2); !ok || newDev.Introducer || newDev.SkipIntroductionRemovals { @@ -703,20 +689,13 @@ func TestIntroducer(t *testing.T) { }, }, }) - m.ClusterConfig(device1, protocol.ClusterConfig{ - Folders: []protocol.Folder{ - { - ID: "folder2", - Devices: []protocol.Device{ - { - ID: device2, - Introducer: true, - SkipIntroductionRemovals: true, - }, - }, - }, - }, + cc = basicClusterConfig(myID, device1, "folder2") + cc.Folders[0].Devices = append(cc.Folders[0].Devices, protocol.Device{ + ID: device2, + Introducer: true, + SkipIntroductionRemovals: true, }) + m.ClusterConfig(device1, cc) if _, ok := m.cfg.Device(device2); !ok { t.Error("device 2 should not have been removed") diff --git a/lib/model/requests_test.go b/lib/model/requests_test.go index ee2cf2895..bdbdc0f77 100644 --- a/lib/model/requests_test.go +++ b/lib/model/requests_test.go @@ -1147,3 +1147,122 @@ func TestRequestLastFileProgress(t *testing.T) { t.Fatal("Timed out before file was requested") } } + +func TestRequestIndexSenderPause(t *testing.T) { + m, fc, fcfg := setupModelWithConnection() + tfs := fcfg.Filesystem() + defer cleanupModelAndRemoveDir(m, tfs.URI()) + + indexChan := make(chan []protocol.FileInfo) + fc.mut.Lock() + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { + indexChan <- fs + } + fc.mut.Unlock() + + var seq int64 = 1 + files := []protocol.FileInfo{{Name: "foo", Size: 10, Version: protocol.Vector{}.Update(myID.Short()), Sequence: seq}} + + // Both devices connected, noone paused + localIndexUpdate(m, fcfg.ID, files) + select { + case <-time.After(5 * time.Second): + l.Infoln("timeout") + t.Fatal("timed out before receiving index") + case <-indexChan: + } + + // Remote paused + + cc := basicClusterConfig(device1, myID, fcfg.ID) + cc.Folders[0].Paused = true + m.ClusterConfig(device1, cc) + + seq++ + files[0].Sequence = seq + files[0].Version = files[0].Version.Update(myID.Short()) + localIndexUpdate(m, fcfg.ID, files) + + // I don't see what to hook into to ensure an index update is not sent. + dur := 50 * time.Millisecond + if !testing.Short() { + dur = 2 * time.Second + } + select { + case <-time.After(dur): + case <-indexChan: + t.Error("Received index despite remote being paused") + } + + // Remote unpaused + + cc.Folders[0].Paused = false + m.ClusterConfig(device1, cc) + select { + case <-time.After(5 * time.Second): + t.Fatal("timed out before receiving index") + case <-indexChan: + } + + // Local paused and resume + + fcfg.Paused = true + waiter, _ := m.cfg.SetFolder(fcfg) + waiter.Wait() + + fcfg.Paused = false + waiter, _ = m.cfg.SetFolder(fcfg) + waiter.Wait() + + seq++ + files[0].Sequence = seq + files[0].Version = files[0].Version.Update(myID.Short()) + localIndexUpdate(m, fcfg.ID, files) + select { + case <-time.After(5 * time.Second): + t.Fatal("timed out before receiving index") + case <-indexChan: + } + + // Local and remote paused, then first resume remote, then local + + cc.Folders[0].Paused = true + m.ClusterConfig(device1, cc) + + fcfg.Paused = true + waiter, _ = m.cfg.SetFolder(fcfg) + waiter.Wait() + + cc.Folders[0].Paused = false + m.ClusterConfig(device1, cc) + + fcfg.Paused = false + waiter, _ = m.cfg.SetFolder(fcfg) + waiter.Wait() + + seq++ + files[0].Sequence = seq + files[0].Version = files[0].Version.Update(myID.Short()) + localIndexUpdate(m, fcfg.ID, files) + select { + case <-time.After(5 * time.Second): + t.Fatal("timed out before receiving index") + case <-indexChan: + } + + // Folder removed on remote + + cc = protocol.ClusterConfig{} + m.ClusterConfig(device1, cc) + + seq++ + files[0].Sequence = seq + files[0].Version = files[0].Version.Update(myID.Short()) + localIndexUpdate(m, fcfg.ID, files) + + select { + case <-time.After(dur): + case <-indexChan: + t.Error("Received index despite remote not having the folder") + } +} diff --git a/lib/model/testutils_test.go b/lib/model/testutils_test.go index e4187c281..7980866f5 100644 --- a/lib/model/testutils_test.go +++ b/lib/model/testutils_test.go @@ -230,3 +230,41 @@ func folderIgnoresAlwaysReload(m *model, fcfg config.FolderConfiguration) { m.addAndStartFolderLockedWithIgnores(fcfg, fset, ignores) m.fmut.Unlock() } + +func basicClusterConfig(local, remote protocol.DeviceID, folders ...string) protocol.ClusterConfig { + var cc protocol.ClusterConfig + for _, folder := range folders { + cc.Folders = append(cc.Folders, protocol.Folder{ + ID: folder, + Devices: []protocol.Device{ + { + ID: local, + }, + { + ID: remote, + }, + }, + }) + } + return cc +} + +func localIndexUpdate(m *model, folder string, fs []protocol.FileInfo) { + m.fmut.RLock() + fset := m.folderFiles[folder] + m.fmut.RUnlock() + + fset.Update(protocol.LocalDeviceID, fs) + seq := fset.Sequence(protocol.LocalDeviceID) + filenames := make([]string, len(fs)) + for i, file := range fs { + filenames[i] = file.Name + } + m.evLogger.Log(events.LocalIndexUpdated, map[string]interface{}{ + "folder": folder, + "items": len(fs), + "filenames": filenames, + "sequence": seq, + "version": seq, // legacy for sequence + }) +}