diff --git a/lib/model/fakeconns_test.go b/lib/model/fakeconns_test.go index d3c0077c0..4022e4df0 100644 --- a/lib/model/fakeconns_test.go +++ b/lib/model/fakeconns_test.go @@ -35,6 +35,7 @@ type fakeConnection struct { indexFn func(context.Context, string, []protocol.FileInfo) requestFn func(ctx context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) closeFn func(error) + clusterConfigFn func(protocol.ClusterConfig) mut sync.Mutex } @@ -91,7 +92,13 @@ func (f *fakeConnection) Request(ctx context.Context, folder, name string, offse return f.fileData[name], nil } -func (f *fakeConnection) ClusterConfig(protocol.ClusterConfig) {} +func (f *fakeConnection) ClusterConfig(cc protocol.ClusterConfig) { + f.mut.Lock() + defer f.mut.Unlock() + if f.clusterConfigFn != nil { + f.clusterConfigFn(cc) + } +} func (f *fakeConnection) Ping() bool { f.mut.Lock() diff --git a/lib/model/model.go b/lib/model/model.go index 002ed6654..cf6884ec8 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -150,6 +150,7 @@ type model struct { helloMessages map[protocol.DeviceID]protocol.Hello deviceDownloads map[protocol.DeviceID]*deviceDownloadState remotePausedFolders map[protocol.DeviceID][]string // deviceID -> folders + indexSenderTokens map[protocol.DeviceID][]suture.ServiceToken foldersRunning int32 // for testing only } @@ -222,6 +223,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio helloMessages: make(map[protocol.DeviceID]protocol.Hello), deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), remotePausedFolders: make(map[protocol.DeviceID][]string), + indexSenderTokens: make(map[protocol.DeviceID][]suture.ServiceToken), } for devID := range cfg.Devices() { m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID.String()) @@ -257,13 +259,16 @@ func (m *model) onServe() { func (m *model) Stop() { m.cfg.Unsubscribe(m) m.Supervisor.Stop() - devs := m.cfg.Devices() - ids := make([]protocol.DeviceID, 0, len(devs)) - for id := range devs { - ids = append(ids, id) + m.pmut.RLock() + closed := make([]chan struct{}, 0, len(m.conn)) + for id, conn := range m.conn { + closed = append(closed, m.closed[id]) + go conn.Close(errStopped) + } + m.pmut.RUnlock() + for _, c := range closed { + <-c } - w := m.closeConns(ids, errStopped) - w.Wait() } // StartDeadlockDetector starts a deadlock detector on the models locks which @@ -393,7 +398,12 @@ func (m *model) warnAboutOverwritingProtectedFiles(cfg config.FolderConfiguratio } func (m *model) removeFolder(cfg config.FolderConfiguration) { - m.stopFolder(cfg, fmt.Errorf("removing folder %v", cfg.Description())) + m.fmut.RLock() + token, ok := m.folderRunnerToken[cfg.ID] + m.fmut.RUnlock() + if ok { + m.RemoveAndWait(token, 0) + } m.fmut.Lock() @@ -417,22 +427,6 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) { db.DropFolder(m.db, cfg.ID) } -func (m *model) stopFolder(cfg config.FolderConfiguration, err error) { - // Stop the services running for this folder and wait for them to finish - // stopping to prevent races on restart. - m.fmut.RLock() - token, ok := m.folderRunnerToken[cfg.ID] - m.fmut.RUnlock() - - if ok { - m.RemoveAndWait(token, 0) - } - - // Wait for connections to stop to ensure that no more calls to methods - // expecting this folder to exist happen (e.g. .IndexUpdate). - m.closeConns(cfg.DeviceIDs(), err).Wait() -} - // Need to hold lock on m.fmut when calling this. func (m *model) cleanupFolderLocked(cfg config.FolderConfiguration) { // clear up our config maps @@ -464,25 +458,13 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF restartMut.Lock() defer restartMut.Unlock() - var infoMsg string - var errMsg string - switch { - case to.Paused: - infoMsg = "Paused" - errMsg = "pausing" - case from.Paused: - infoMsg = "Unpaused" - errMsg = "unpausing" - default: - infoMsg = "Restarted" - errMsg = "restarting" + m.fmut.RLock() + token, ok := m.folderRunnerToken[from.ID] + m.fmut.RUnlock() + if ok { + m.RemoveAndWait(token, 0) } - err := fmt.Errorf("%v folder %v", errMsg, to.Description()) - m.stopFolder(from, err) - // Need to send CC change to both from and to devices. - m.closeConns(to.DeviceIDs(), err) - m.fmut.Lock() defer m.fmut.Unlock() @@ -499,6 +481,16 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF } m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles) } + + var infoMsg string + switch { + case to.Paused: + infoMsg = "Paused" + case from.Paused: + infoMsg = "Unpaused" + default: + infoMsg = "Restarted" + } l.Infof("%v folder %v (%v)", infoMsg, to.Description(), to.Type) } @@ -507,9 +499,6 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool // we do it outside of the lock. fset := db.NewFileSet(cfg.ID, cfg.Filesystem(), m.db) - // Close connections to affected devices - m.closeConns(cfg.DeviceIDs(), fmt.Errorf("started folder %v", cfg.Description())) - m.fmut.Lock() defer m.fmut.Unlock() m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles) @@ -992,6 +981,9 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon m.pmut.RLock() conn, ok := m.conn[deviceID] closed := m.closed[deviceID] + for _, token := range m.indexSenderTokens[deviceID] { + m.RemoveAndWait(token, 0) + } m.pmut.RUnlock() if !ok { panic("bug: ClusterConfig called on closed or nonexistent connection") @@ -1024,6 +1016,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon } var paused []string + indexSenderTokens := make([]suture.ServiceToken, 0, len(cm.Folders)) for _, folder := range cm.Folders { cfg, ok := m.cfg.Folder(folder.ID) if !ok || !cfg.SharedWith(deviceID) { @@ -1142,14 +1135,12 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon evLogger: m.evLogger, } is.Service = util.AsService(is.serve, is.String()) - // The token isn't tracked as the service stops when the connection - // terminates and is automatically removed from supervisor (by - // implementing suture.IsCompletable). - m.Add(is) + indexSenderTokens = append(indexSenderTokens, m.Add(is)) } m.pmut.Lock() m.remotePausedFolders[deviceID] = paused + m.indexSenderTokens[deviceID] = indexSenderTokens m.pmut.Unlock() // This breaks if we send multiple CM messages during the same connection. @@ -1397,41 +1388,6 @@ func (m *model) Closed(conn protocol.Connection, err error) { close(closed) } -// closeConns will close the underlying connection for given devices and return -// a waiter that will return once all the connections are finished closing. -func (m *model) closeConns(devs []protocol.DeviceID, err error) config.Waiter { - conns := make([]connections.Connection, 0, len(devs)) - closed := make([]chan struct{}, 0, len(devs)) - m.pmut.RLock() - for _, dev := range devs { - if conn, ok := m.conn[dev]; ok { - conns = append(conns, conn) - closed = append(closed, m.closed[dev]) - } - } - m.pmut.RUnlock() - for _, conn := range conns { - conn.Close(err) - } - return &channelWaiter{chans: closed} -} - -// closeConn closes the underlying connection for the given device and returns -// a waiter that will return once the connection is finished closing. -func (m *model) closeConn(dev protocol.DeviceID, err error) config.Waiter { - return m.closeConns([]protocol.DeviceID{dev}, err) -} - -type channelWaiter struct { - chans []chan struct{} -} - -func (w *channelWaiter) Wait() { - for _, c := range w.chans { - <-c - } -} - // Implements protocol.RequestResponse type requestResponse struct { data []byte @@ -2467,6 +2423,9 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { // Go through the folder configs and figure out if we need to restart or not. + // Tracks devices affected by any configuration change to resend ClusterConfig. + clusterConfigDevices := make(map[protocol.DeviceID]struct{}, len(from.Devices)+len(to.Devices)) + fromFolders := mapFolders(from.Folders) toFolders := mapFolders(to.Folders) for folderID, cfg := range toFolders { @@ -2478,6 +2437,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { l.Infoln("Adding folder", cfg.Description()) m.newFolder(cfg, to.Options.CacheIgnoredFiles) } + clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, cfg.DeviceIDs()) } } @@ -2486,6 +2446,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { if !ok { // The folder was removed. m.removeFolder(fromCfg) + clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs()) continue } @@ -2497,6 +2458,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { // Check if anything differs that requires a restart. if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) || from.Options.CacheIgnoredFiles != to.Options.CacheIgnoredFiles { m.restartFolder(fromCfg, toCfg, to.Options.CacheIgnoredFiles) + clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs()) + clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, toCfg.DeviceIDs()) } // Emit the folder pause/resume event @@ -2519,6 +2482,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { // Pausing a device, unpausing is handled by the connection service. fromDevices := from.DeviceMap() toDevices := to.DeviceMap() + closeDevices := make([]protocol.DeviceID, 0, len(to.Devices)) for deviceID, toCfg := range toDevices { fromCfg, ok := fromDevices[deviceID] if !ok { @@ -2534,13 +2498,14 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { } // Ignored folder was removed, reconnect to retrigger the prompt. - if len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) { - m.closeConn(deviceID, errIgnoredFolderRemoved) + if !toCfg.Paused && len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) { + closeDevices = append(closeDevices, deviceID) } if toCfg.Paused { l.Infoln("Pausing", deviceID) - m.closeConn(deviceID, errDevicePaused) + closeDevices = append(closeDevices, deviceID) + delete(clusterConfigDevices, deviceID) m.evLogger.Log(events.DevicePaused, map[string]string{"device": deviceID.String()}) } else { m.evLogger.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()}) @@ -2551,9 +2516,28 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { for deviceID := range fromDevices { delete(m.deviceStatRefs, deviceID) removedDevices = append(removedDevices, deviceID) + delete(clusterConfigDevices, deviceID) } m.fmut.Unlock() - m.closeConns(removedDevices, errDeviceRemoved) + + m.pmut.RLock() + for _, id := range closeDevices { + if conn, ok := m.conn[id]; ok { + go conn.Close(errDevicePaused) + } + } + for _, id := range removedDevices { + if conn, ok := m.conn[id]; ok { + go conn.Close(errDeviceRemoved) + } + } + for id := range clusterConfigDevices { + if conn, ok := m.conn[id]; ok { + cm := m.generateClusterConfig(conn.ID()) + go conn.ClusterConfig(cm) + } + } + m.pmut.RUnlock() m.globalRequestLimiter.setCapacity(1024 * to.Options.MaxConcurrentIncomingRequestKiB()) m.folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency()) @@ -2758,3 +2742,12 @@ func sanitizePath(path string) string { return strings.TrimSpace(b.String()) } + +func addDeviceIDsToMap(m map[protocol.DeviceID]struct{}, s []protocol.DeviceID) map[protocol.DeviceID]struct{} { + for _, id := range s { + if _, ok := m[id]; !ok { + m[id] = struct{}{} + } + } + return m +} diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 3214c964a..5fec5829b 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -3339,17 +3339,19 @@ func TestConnCloseOnRestart(t *testing.T) { closed := m.closed[device1] m.pmut.RUnlock() - newFcfg := fcfg.Copy() - newFcfg.Paused = true + waiter, err := w.RemoveDevice(device1) + if err != nil { + t.Fatal(err) + } done := make(chan struct{}) go func() { - m.restartFolder(fcfg, newFcfg, false) + waiter.Wait() close(done) }() select { case <-done: case <-time.After(5 * time.Second): - t.Fatal("Timed out before folder restart returned") + t.Fatal("Timed out before config took effect") } select { case <-closed: @@ -3845,8 +3847,8 @@ func TestScanRenameCaseOnly(t *testing.T) { }) } -func TestConnectionTerminationOnFolderAdd(t *testing.T) { - testConfigChangeClosesConnections(t, false, true, nil, func(cfg config.Wrapper) { +func TestClusterConfigOnFolderAdd(t *testing.T) { + testConfigChangeTriggersClusterConfigs(t, false, true, nil, func(cfg config.Wrapper) { fcfg := testFolderConfigTmp() fcfg.ID = "second" fcfg.Label = "second" @@ -3859,8 +3861,8 @@ func TestConnectionTerminationOnFolderAdd(t *testing.T) { }) } -func TestConnectionTerminationOnFolderShare(t *testing.T) { - testConfigChangeClosesConnections(t, true, true, nil, func(cfg config.Wrapper) { +func TestClusterConfigOnFolderShare(t *testing.T) { + testConfigChangeTriggersClusterConfigs(t, true, true, nil, func(cfg config.Wrapper) { fcfg := cfg.FolderList()[0] fcfg.Devices = []config.FolderDeviceConfiguration{{device2, protocol.EmptyDeviceID}} if w, err := cfg.SetFolder(fcfg); err != nil { @@ -3871,8 +3873,8 @@ func TestConnectionTerminationOnFolderShare(t *testing.T) { }) } -func TestConnectionTerminationOnFolderUnshare(t *testing.T) { - testConfigChangeClosesConnections(t, true, false, nil, func(cfg config.Wrapper) { +func TestClusterConfigOnFolderUnshare(t *testing.T) { + testConfigChangeTriggersClusterConfigs(t, true, false, nil, func(cfg config.Wrapper) { fcfg := cfg.FolderList()[0] fcfg.Devices = nil if w, err := cfg.SetFolder(fcfg); err != nil { @@ -3883,8 +3885,8 @@ func TestConnectionTerminationOnFolderUnshare(t *testing.T) { }) } -func TestConnectionTerminationOnFolderRemove(t *testing.T) { - testConfigChangeClosesConnections(t, true, false, nil, func(cfg config.Wrapper) { +func TestClusterConfigOnFolderRemove(t *testing.T) { + testConfigChangeTriggersClusterConfigs(t, true, false, nil, func(cfg config.Wrapper) { rcfg := cfg.RawCopy() rcfg.Folders = nil if w, err := cfg.Replace(rcfg); err != nil { @@ -3895,8 +3897,8 @@ func TestConnectionTerminationOnFolderRemove(t *testing.T) { }) } -func TestConnectionTerminationOnFolderPause(t *testing.T) { - testConfigChangeClosesConnections(t, true, false, nil, func(cfg config.Wrapper) { +func TestClusterConfigOnFolderPause(t *testing.T) { + testConfigChangeTriggersClusterConfigs(t, true, false, nil, func(cfg config.Wrapper) { fcfg := cfg.FolderList()[0] fcfg.Paused = true if w, err := cfg.SetFolder(fcfg); err != nil { @@ -3907,8 +3909,8 @@ func TestConnectionTerminationOnFolderPause(t *testing.T) { }) } -func TestConnectionTerminationOnFolderUnpause(t *testing.T) { - testConfigChangeClosesConnections(t, true, false, func(cfg config.Wrapper) { +func TestClusterConfigOnFolderUnpause(t *testing.T) { + testConfigChangeTriggersClusterConfigs(t, true, false, func(cfg config.Wrapper) { fcfg := cfg.FolderList()[0] fcfg.Paused = true if w, err := cfg.SetFolder(fcfg); err != nil { @@ -3984,7 +3986,7 @@ func TestScanDeletedROChangedOnSR(t *testing.T) { } } -func testConfigChangeClosesConnections(t *testing.T, expectFirstClosed, expectSecondClosed bool, pre func(config.Wrapper), fn func(config.Wrapper)) { +func testConfigChangeTriggersClusterConfigs(t *testing.T, expectFirst, expectSecond bool, pre func(config.Wrapper), fn func(config.Wrapper)) { t.Helper() wcfg, _ := tmpDefaultWrapper() m := setupModel(wcfg) @@ -3999,21 +4001,55 @@ func testConfigChangeClosesConnections(t *testing.T, expectFirstClosed, expectSe pre(wcfg) } - fc1 := &fakeConnection{id: device1, model: m} - fc2 := &fakeConnection{id: device2, model: m} + cc1 := make(chan struct{}, 1) + cc2 := make(chan struct{}, 1) + fc1 := &fakeConnection{ + id: device1, + model: m, + clusterConfigFn: func(_ protocol.ClusterConfig) { + cc1 <- struct{}{} + }, + } + fc2 := &fakeConnection{ + id: device2, + model: m, + clusterConfigFn: func(_ protocol.ClusterConfig) { + cc2 <- struct{}{} + }, + } m.AddConnection(fc1, protocol.Hello{}) m.AddConnection(fc2, protocol.Hello{}) + // Initial CCs + select { + case <-cc1: + default: + t.Fatal("missing initial CC from device1") + } + select { + case <-cc2: + default: + t.Fatal("missing initial CC from device2") + } + t.Log("Applying config change") fn(wcfg) - if expectFirstClosed != fc1.closed { - t.Errorf("first connection state mismatch: %t (expected) != %t", expectFirstClosed, fc1.closed) + timeout := time.NewTimer(time.Second) + if expectFirst { + select { + case <-cc1: + case <-timeout.C: + t.Errorf("timed out before receiving cluste rconfig for first device") + } } - - if expectSecondClosed != fc2.closed { - t.Errorf("second connection state mismatch: %t (expected) != %t", expectSecondClosed, fc2.closed) + if expectSecond { + select { + case <-cc2: + case <-timeout.C: + t.Errorf("timed out before receiving cluste rconfig for second device") + } } } diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index 6c1420b0f..d53dbf071 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -322,11 +322,9 @@ func (c *rawConnection) Request(ctx context.Context, folder string, name string, } // ClusterConfig sends the cluster configuration message to the peer. -// It must be called just once (as per BEP), otherwise it will panic. func (c *rawConnection) ClusterConfig(config ClusterConfig) { select { case c.clusterConfigBox <- &config: - close(c.clusterConfigBox) case <-c.closed: } } @@ -386,13 +384,12 @@ func (c *rawConnection) dispatcherLoop() (err error) { switch msg := msg.(type) { case *ClusterConfig: l.Debugln("read ClusterConfig message") - if state != stateInitial { - return fmt.Errorf("protocol error: cluster config message in state %d", state) + if state == stateInitial { + state = stateReady } if err := c.receiver.ClusterConfig(c.id, *msg); err != nil { return errors.Wrap(err, "receiver error") } - state = stateReady case *Index: l.Debugln("read Index message") @@ -683,6 +680,12 @@ func (c *rawConnection) writerLoop() { } for { select { + case cc := <-c.clusterConfigBox: + err := c.writeMessage(cc) + if err != nil { + c.internalClose(err) + return + } case hm := <-c.outbox: err := c.writeMessage(hm.msg) if hm.done != nil {