diff --git a/lib/model/indexsender.go b/lib/model/indexsender.go index 723312615..6ebe77164 100644 --- a/lib/model/indexsender.go +++ b/lib/model/indexsender.go @@ -242,12 +242,6 @@ func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.Fi r.mut.Unlock() } -func (r *indexSenderRegistry) addNew(folder config.FolderConfiguration, fset *db.FileSet) { - r.mut.Lock() - r.startLocked(folder.ID, fset, 0) - r.mut.Unlock() -} - func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) { myIndexID := fset.IndexID(protocol.LocalDeviceID) mySequence := fset.Sequence(protocol.LocalDeviceID) @@ -282,7 +276,7 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", r.deviceID, folder.Description(), startInfo.local.IndexID, myIndexID) startSequence = 0 } else { - l.Debugf("Device %v folder %s is not delta index compatible", r.deviceID, folder.Description()) + l.Debugf("Device %v folder %s has no index ID for us", r.deviceID, folder.Description()) } // This is the other side's description of themselves. We @@ -296,6 +290,7 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset // do not support delta indexes and we should clear any // information we have from them before accepting their // index, which will presumably be a full index. + l.Debugf("Device %v folder %s does not announce an index ID", r.deviceID, folder.Description()) fset.Drop(r.deviceID) } else if startInfo.remote.IndexID != theirIndexID { // The index ID we have on file is not what they're @@ -308,22 +303,18 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset fset.SetIndexID(r.deviceID, startInfo.remote.IndexID) } - r.startLocked(folder.ID, fset, startSequence) -} - -func (r *indexSenderRegistry) startLocked(folderID string, fset *db.FileSet, startSequence int64) { - if is, ok := r.indexSenders[folderID]; ok { + if is, ok := r.indexSenders[folder.ID]; ok { r.sup.RemoveAndWait(is.token, 0) - delete(r.indexSenders, folderID) + delete(r.indexSenders, folder.ID) } - if _, ok := r.startInfos[folderID]; ok { - delete(r.startInfos, folderID) + if _, ok := r.startInfos[folder.ID]; ok { + delete(r.startInfos, folder.ID) } is := &indexSender{ conn: r.conn, connClosed: r.closed, - folder: folderID, + folder: folder.ID, fset: fset, prevSequence: startSequence, evLogger: r.evLogger, @@ -331,13 +322,13 @@ func (r *indexSenderRegistry) startLocked(folderID string, fset *db.FileSet, sta resumeChan: make(chan *db.FileSet), } is.token = r.sup.Add(is) - r.indexSenders[folderID] = is + r.indexSenders[folder.ID] = is } -// addPaused stores the given info to start an index sender once resume is called +// addPending stores the given info to start an index sender once resume is called // for this folder. // If an index sender is already running, it will be stopped. -func (r *indexSenderRegistry) addPaused(folder config.FolderConfiguration, startInfo *indexSenderStartInfo) { +func (r *indexSenderRegistry) addPending(folder config.FolderConfiguration, startInfo *indexSenderStartInfo) { r.mut.Lock() defer r.mut.Unlock() diff --git a/lib/model/model.go b/lib/model/model.go index d1769584b..deb05e177 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -254,13 +254,16 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio func (m *model) serve(ctx context.Context) error { // Add and start folders cacheIgnoredFiles := m.cfg.Options().CacheIgnoredFiles + clusterConfigDevices := make(deviceIDSet, len(m.cfg.Devices())) for _, folderCfg := range m.cfg.Folders() { if folderCfg.Paused { folderCfg.CreateRoot() continue } m.newFolder(folderCfg, cacheIgnoredFiles) + clusterConfigDevices.add(folderCfg.DeviceIDs()) } + m.resendClusterConfig(clusterConfigDevices.AsSlice()) m.cfg.Subscribe(m) close(m.started) @@ -519,13 +522,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF // In case the folder was newly shared with us we already got a // cluster config and wont necessarily get another soon - start // sending indexes if connected. - isNew := !from.SharedWith(indexSenders.deviceID) - if isNew { - indexSenders.addNew(to, fset) - } if to.Paused { indexSenders.pause(to.ID) - } else if !isNew && (fsetNil || from.Paused) { + } else if !from.SharedWith(indexSenders.deviceID) || fsetNil || from.Paused { indexSenders.resume(to, fset) } } @@ -554,20 +553,10 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool // Cluster configs might be received and processed before reaching this // point, i.e. before the folder is started. If that's the case, start // index senders here. - localSequenceZero := fset.Sequence(protocol.LocalDeviceID) == 0 m.pmut.RLock() for _, id := range cfg.DeviceIDs() { if is, ok := m.indexSenders[id]; ok { - if localSequenceZero && fset.Sequence(id) == 0 { - // In case this folder was shared to us and - // newly added, add a new index sender. - is.addNew(cfg, fset) - } else { - // For existing folders we stored the index data from - // the cluster config, so resume based on that - if - // we didn't get a cluster config yet, it's a noop. - is.resume(cfg, fset) - } + is.resume(cfg, fset) } } m.pmut.RUnlock() @@ -1175,6 +1164,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi continue } m.cfg.AddOrUpdatePendingFolder(folder.ID, folder.Label, deviceID) + indexSenders.addPending(cfg, ccDeviceInfos[folder.ID]) changed = true m.evLogger.Log(events.FolderRejected, map[string]string{ "folder": folder.ID, @@ -1192,7 +1182,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi } if cfg.Paused { - indexSenders.addPaused(cfg, ccDeviceInfos[folder.ID]) + indexSenders.addPending(cfg, ccDeviceInfos[folder.ID]) continue } @@ -1234,7 +1224,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi // Shouldn't happen because !cfg.Paused, but might happen // if the folder is about to be unpaused, but not yet. l.Debugln("ccH: no fset", folder.ID) - indexSenders.addPaused(cfg, ccDeviceInfos[folder.ID]) + indexSenders.addPending(cfg, ccDeviceInfos[folder.ID]) continue } @@ -2482,7 +2472,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { // Go through the folder configs and figure out if we need to restart or not. // Tracks devices affected by any configuration change to resend ClusterConfig. - clusterConfigDevices := make(map[protocol.DeviceID]struct{}, len(from.Devices)+len(to.Devices)) + clusterConfigDevices := make(deviceIDSet, len(from.Devices)+len(to.Devices)) fromFolders := mapFolders(from.Folders) toFolders := mapFolders(to.Folders) @@ -2495,7 +2485,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { l.Infoln("Adding folder", cfg.Description()) m.newFolder(cfg, to.Options.CacheIgnoredFiles) } - clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, cfg.DeviceIDs()) + clusterConfigDevices.add(cfg.DeviceIDs()) } } @@ -2504,7 +2494,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { if !ok { // The folder was removed. m.removeFolder(fromCfg) - clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs()) + clusterConfigDevices.add(fromCfg.DeviceIDs()) continue } @@ -2516,8 +2506,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { // Check if anything differs that requires a restart. if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) || from.Options.CacheIgnoredFiles != to.Options.CacheIgnoredFiles { m.restartFolder(fromCfg, toCfg, to.Options.CacheIgnoredFiles) - clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs()) - clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, toCfg.DeviceIDs()) + clusterConfigDevices.add(fromCfg.DeviceIDs()) + clusterConfigDevices.add(toCfg.DeviceIDs()) } // Emit the folder pause/resume event @@ -2591,11 +2581,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { } m.pmut.RUnlock() // Generating cluster-configs acquires fmut -> must happen outside of pmut. - ids := make([]protocol.DeviceID, 0, len(clusterConfigDevices)) - for id := range clusterConfigDevices { - ids = append(ids, id) - } - m.resendClusterConfig(ids) + m.resendClusterConfig(clusterConfigDevices.AsSlice()) m.globalRequestLimiter.setCapacity(1024 * to.Options.MaxConcurrentIncomingRequestKiB()) m.folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency()) @@ -2801,13 +2787,22 @@ func sanitizePath(path string) string { return strings.TrimSpace(b.String()) } -func addDeviceIDsToMap(m map[protocol.DeviceID]struct{}, s []protocol.DeviceID) map[protocol.DeviceID]struct{} { - for _, id := range s { - if _, ok := m[id]; !ok { - m[id] = struct{}{} +type deviceIDSet map[protocol.DeviceID]struct{} + +func (s deviceIDSet) add(ids []protocol.DeviceID) { + for _, id := range ids { + if _, ok := s[id]; !ok { + s[id] = struct{}{} } } - return m +} + +func (s deviceIDSet) AsSlice() []protocol.DeviceID { + ids := make([]protocol.DeviceID, 0, len(s)) + for id := range s { + ids = append(ids, id) + } + return ids } func encryptionTokenPath(cfg config.FolderConfiguration) string { diff --git a/lib/model/requests_test.go b/lib/model/requests_test.go index a2b7db5e6..58e2171a4 100644 --- a/lib/model/requests_test.go +++ b/lib/model/requests_test.go @@ -1275,9 +1275,6 @@ func TestRequestIndexSenderPause(t *testing.T) { } func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) { - done := make(chan struct{}) - defer close(done) - ldb := db.NewLowlevel(backend.OpenMemory()) w, fcfg := tmpDefaultWrapper() tfs := fcfg.Filesystem() @@ -1294,11 +1291,14 @@ func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) { m.evCancel() <-m.stopped - // Add connection (sends cluster config) before starting the new model + // Add connection (sends incoming cluster config) before starting the new model m = newModel(w, myID, "syncthing", "dev", ldb, nil) defer cleanupModel(m) fc := addFakeConn(m, device1) - indexChan := make(chan []protocol.FileInfo) + done := make(chan struct{}) + defer close(done) // Must be the last thing to be deferred, thus first to run. + indexChan := make(chan []protocol.FileInfo, 1) + ccChan := make(chan protocol.ClusterConfig, 1) fc.mut.Lock() fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { select { @@ -1306,16 +1306,30 @@ func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) { case <-done: } } + fc.clusterConfigFn = func(cc protocol.ClusterConfig) { + select { + case ccChan <- cc: + case <-done: + } + } fc.mut.Unlock() m.ServeBackground() - <-m.started + + timeout := time.After(5 * time.Second) + + // Check that cluster-config is resent after adding folders when starting model + select { + case <-timeout: + t.Fatal("timed out before receiving cluster-config") + case <-ccChan: + } // Check that an index is sent for the newly added item must(t, tfs.Mkdir(dir2, 0777)) m.ScanFolders() select { - case <-time.After(5 * time.Second): + case <-timeout: t.Fatal("timed out before receiving index") case <-indexChan: }