Compare commits

...

13 Commits

Author SHA1 Message Date
Jakob Borg
d0c3697152 cmd/syncthing: Print version information early (fixes #5891) (#5893) 2019-07-27 20:36:33 +02:00
Simon Frei
35f40e9a58 lib/model: Create new file-set after stopping folder (fixes #5882) (#5883) 2019-07-23 20:39:25 +02:00
Simon Frei
5de9b677c2 lib/fs: Fix kqueue event list (fixes #5308) (#5885) 2019-07-23 14:11:15 +02:00
Simon Frei
6f08162376 lib/model: Remove incorrect/useless panics (#5881) 2019-07-23 10:51:16 +02:00
Simon Frei
7b3d9a8dca lib/syncthing: Refactor to use util.AsService (#5858) 2019-07-23 10:50:37 +02:00
Simon Frei
942659fb06 lib/model, lib/nat: More service termination speedup (#5884) 2019-07-23 10:49:22 +02:00
dependabot-preview[bot]
15c262184b build(deps): bump github.com/maruel/panicparse from 1.2.1 to 1.3.0 (#5879)
Bumps [github.com/maruel/panicparse](https://github.com/maruel/panicparse) from 1.2.1 to 1.3.0.
- [Release notes](https://github.com/maruel/panicparse/releases)
- [Commits](https://github.com/maruel/panicparse/compare/v1.2.1...v1.3.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2019-07-22 19:46:52 +01:00
dependabot-preview[bot]
484fa0592e build(deps): bump github.com/lib/pq from 1.1.1 to 1.2.0 (#5878)
Bumps [github.com/lib/pq](https://github.com/lib/pq) from 1.1.1 to 1.2.0.
- [Release notes](https://github.com/lib/pq/releases)
- [Commits](https://github.com/lib/pq/compare/v1.1.1...v1.2.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2019-07-22 08:07:21 +01:00
Simon Frei
b5b54ff057 lib/model: No watch-error on missing folder (fixes #5833) (#5876) 2019-07-19 19:41:16 +02:00
Simon Frei
4d3432af3e lib: Ensure timely service termination (fixes #5860) (#5863) 2019-07-19 19:40:40 +02:00
Simon Frei
1cb55904bc lib/model: Prevent panic in NeedFolderFiles (fixes #5872) (#5875) 2019-07-19 19:39:52 +02:00
Simon Frei
2b622d0774 lib/model: Close conn on dev pause (fixes #5873) (#5874) 2019-07-19 19:37:29 +02:00
Simon Frei
1894123d3c lib/syncthing: Modify exit status before stopping (fixes #5869) (#5870) 2019-07-18 20:49:00 +02:00
21 changed files with 344 additions and 255 deletions

View File

@@ -548,6 +548,14 @@ func upgradeViaRest() error {
}
func syncthingMain(runtimeOptions RuntimeOptions) {
// Set a log prefix similar to the ID we will have later on, or early log
// lines look ugly.
l.SetPrefix("[start] ")
// Print our version information up front, so any crash that happens
// early etc. will have it available.
l.Infoln(build.LongVersion)
// Ensure that we have a certificate and key.
cert, err := tls.LoadX509KeyPair(
locations.Get(locations.CertFile),

4
go.mod
View File

@@ -19,9 +19,9 @@ require (
github.com/jackpal/gateway v0.0.0-20161225004348-5795ac81146e
github.com/kballard/go-shellquote v0.0.0-20170619183022-cd60e84ee657
github.com/kr/pretty v0.1.0 // indirect
github.com/lib/pq v1.1.1
github.com/lib/pq v1.2.0
github.com/lucas-clemente/quic-go v0.11.2
github.com/maruel/panicparse v1.2.1
github.com/maruel/panicparse v1.3.0
github.com/mattn/go-isatty v0.0.7
github.com/minio/sha256-simd v0.0.0-20190117184323-cc1980cb0338
github.com/onsi/ginkgo v1.8.0 // indirect

6
go.sum
View File

@@ -72,18 +72,24 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lucas-clemente/quic-go v0.11.2 h1:Mop0ac3zALaBR3wGs6j8OYe/tcFvFsxTUFMkE/7yUOI=
github.com/lucas-clemente/quic-go v0.11.2/go.mod h1:PpMmPfPKO9nKJ/psF49ESTAGQSdfXxlg1otPbEB2nOw=
github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA=
github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwmjgmPuiQEcYk=
github.com/maruel/panicparse v1.2.1 h1:mNlHGiakrixj+AwF/qRpTwnj+zsWYPRLQ7wRqnJsfO0=
github.com/maruel/panicparse v1.2.1/go.mod h1:vszMjr5QQ4F5FSRfraldcIA/BCw5xrdLL+zEcU2nRBs=
github.com/maruel/panicparse v1.3.0 h1:1Ep/RaYoSL1r5rTILHQQbyzHG8T4UP5ZbQTYTo4bdDc=
github.com/maruel/panicparse v1.3.0/go.mod h1:vszMjr5QQ4F5FSRfraldcIA/BCw5xrdLL+zEcU2nRBs=
github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/minio/sha256-simd v0.0.0-20190117184323-cc1980cb0338 h1:USW1+zAUkUSvk097CAX/i8KR3r6f+DHNhk6Xe025Oyw=
github.com/minio/sha256-simd v0.0.0-20190117184323-cc1980cb0338/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=

View File

@@ -8,7 +8,6 @@ package beacon
import (
"net"
stdsync "sync"
"github.com/thejerf/suture"
)
@@ -24,21 +23,3 @@ type Interface interface {
Recv() ([]byte, net.Addr)
Error() error
}
type errorHolder struct {
err error
mut stdsync.Mutex // uses stdlib sync as I want this to be trivially embeddable, and there is no risk of blocking
}
func (e *errorHolder) setError(err error) {
e.mut.Lock()
e.err = err
e.mut.Unlock()
}
func (e *errorHolder) Error() error {
e.mut.Lock()
err := e.err
e.mut.Unlock()
return err
}

View File

@@ -11,8 +11,9 @@ import (
"net"
"time"
"github.com/syncthing/syncthing/lib/sync"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/util"
)
type Broadcast struct {
@@ -44,16 +45,16 @@ func NewBroadcast(port int) *Broadcast {
}
b.br = &broadcastReader{
port: port,
outbox: b.outbox,
connMut: sync.NewMutex(),
port: port,
outbox: b.outbox,
}
b.br.ServiceWithError = util.AsServiceWithError(b.br.serve)
b.Add(b.br)
b.bw = &broadcastWriter{
port: port,
inbox: b.inbox,
connMut: sync.NewMutex(),
port: port,
inbox: b.inbox,
}
b.bw.ServiceWithError = util.AsServiceWithError(b.bw.serve)
b.Add(b.bw)
return b
@@ -76,34 +77,42 @@ func (b *Broadcast) Error() error {
}
type broadcastWriter struct {
port int
inbox chan []byte
conn *net.UDPConn
connMut sync.Mutex
errorHolder
util.ServiceWithError
port int
inbox chan []byte
}
func (w *broadcastWriter) Serve() {
func (w *broadcastWriter) serve(stop chan struct{}) error {
l.Debugln(w, "starting")
defer l.Debugln(w, "stopping")
conn, err := net.ListenUDP("udp4", nil)
if err != nil {
l.Debugln(err)
w.setError(err)
return
return err
}
defer conn.Close()
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-stop:
case <-done:
}
conn.Close()
}()
w.connMut.Lock()
w.conn = conn
w.connMut.Unlock()
for {
var bs []byte
select {
case bs = <-w.inbox:
case <-stop:
return nil
}
for bs := range w.inbox {
addrs, err := net.InterfaceAddrs()
if err != nil {
l.Debugln(err)
w.setError(err)
w.SetError(err)
continue
}
@@ -134,14 +143,13 @@ func (w *broadcastWriter) Serve() {
// Write timeouts should not happen. We treat it as a fatal
// error on the socket.
l.Debugln(err)
w.setError(err)
return
return err
}
if err != nil {
// Some other error that we don't expect. Debug and continue.
l.Debugln(err)
w.setError(err)
w.SetError(err)
continue
}
@@ -150,57 +158,49 @@ func (w *broadcastWriter) Serve() {
}
if success > 0 {
w.setError(nil)
w.SetError(nil)
}
}
}
func (w *broadcastWriter) Stop() {
w.connMut.Lock()
if w.conn != nil {
w.conn.Close()
}
w.connMut.Unlock()
}
func (w *broadcastWriter) String() string {
return fmt.Sprintf("broadcastWriter@%p", w)
}
type broadcastReader struct {
port int
outbox chan recv
conn *net.UDPConn
connMut sync.Mutex
errorHolder
util.ServiceWithError
port int
outbox chan recv
}
func (r *broadcastReader) Serve() {
func (r *broadcastReader) serve(stop chan struct{}) error {
l.Debugln(r, "starting")
defer l.Debugln(r, "stopping")
conn, err := net.ListenUDP("udp4", &net.UDPAddr{Port: r.port})
if err != nil {
l.Debugln(err)
r.setError(err)
return
return err
}
defer conn.Close()
r.connMut.Lock()
r.conn = conn
r.connMut.Unlock()
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-stop:
case <-done:
}
conn.Close()
}()
bs := make([]byte, 65536)
for {
n, addr, err := conn.ReadFrom(bs)
if err != nil {
l.Debugln(err)
r.setError(err)
return
return err
}
r.setError(nil)
r.SetError(nil)
l.Debugf("recv %d bytes from %s", n, addr)
@@ -208,19 +208,12 @@ func (r *broadcastReader) Serve() {
copy(c, bs)
select {
case r.outbox <- recv{c, addr}:
case <-stop:
return nil
default:
l.Debugln("dropping message")
}
}
}
func (r *broadcastReader) Stop() {
r.connMut.Lock()
if r.conn != nil {
r.conn.Close()
}
r.connMut.Unlock()
}
func (r *broadcastReader) String() string {

View File

@@ -48,14 +48,14 @@ func NewMulticast(addr string) *Multicast {
addr: addr,
outbox: m.outbox,
}
m.mr.Service = util.AsService(m.mr.serve)
m.mr.ServiceWithError = util.AsServiceWithError(m.mr.serve)
m.Add(m.mr)
m.mw = &multicastWriter{
addr: addr,
inbox: m.inbox,
}
m.mw.Service = util.AsService(m.mw.serve)
m.mw.ServiceWithError = util.AsServiceWithError(m.mw.serve)
m.Add(m.mw)
return m
@@ -78,29 +78,35 @@ func (m *Multicast) Error() error {
}
type multicastWriter struct {
suture.Service
util.ServiceWithError
addr string
inbox <-chan []byte
errorHolder
}
func (w *multicastWriter) serve(stop chan struct{}) {
func (w *multicastWriter) serve(stop chan struct{}) error {
l.Debugln(w, "starting")
defer l.Debugln(w, "stopping")
gaddr, err := net.ResolveUDPAddr("udp6", w.addr)
if err != nil {
l.Debugln(err)
w.setError(err)
return
return err
}
conn, err := net.ListenPacket("udp6", ":0")
if err != nil {
l.Debugln(err)
w.setError(err)
return
return err
}
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-stop:
case <-done:
}
conn.Close()
}()
pconn := ipv6.NewPacketConn(conn)
@@ -113,14 +119,13 @@ func (w *multicastWriter) serve(stop chan struct{}) {
select {
case bs = <-w.inbox:
case <-stop:
return
return nil
}
intfs, err := net.Interfaces()
if err != nil {
l.Debugln(err)
w.setError(err)
return
return err
}
success := 0
@@ -132,7 +137,7 @@ func (w *multicastWriter) serve(stop chan struct{}) {
if err != nil {
l.Debugln(err, "on write to", gaddr, intf.Name)
w.setError(err)
w.SetError(err)
continue
}
@@ -142,16 +147,13 @@ func (w *multicastWriter) serve(stop chan struct{}) {
select {
case <-stop:
return
return nil
default:
}
}
if success > 0 {
w.setError(nil)
} else {
l.Debugln(err)
w.setError(err)
w.SetError(nil)
}
}
}
@@ -161,35 +163,40 @@ func (w *multicastWriter) String() string {
}
type multicastReader struct {
suture.Service
util.ServiceWithError
addr string
outbox chan<- recv
errorHolder
}
func (r *multicastReader) serve(stop chan struct{}) {
func (r *multicastReader) serve(stop chan struct{}) error {
l.Debugln(r, "starting")
defer l.Debugln(r, "stopping")
gaddr, err := net.ResolveUDPAddr("udp6", r.addr)
if err != nil {
l.Debugln(err)
r.setError(err)
return
return err
}
conn, err := net.ListenPacket("udp6", r.addr)
if err != nil {
l.Debugln(err)
r.setError(err)
return
return err
}
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-stop:
case <-done:
}
conn.Close()
}()
intfs, err := net.Interfaces()
if err != nil {
l.Debugln(err)
r.setError(err)
return
return err
}
pconn := ipv6.NewPacketConn(conn)
@@ -206,16 +213,20 @@ func (r *multicastReader) serve(stop chan struct{}) {
if joined == 0 {
l.Debugln("no multicast interfaces available")
r.setError(errors.New("no multicast interfaces available"))
return
return errors.New("no multicast interfaces available")
}
bs := make([]byte, 65536)
for {
select {
case <-stop:
return nil
default:
}
n, _, addr, err := pconn.ReadFrom(bs)
if err != nil {
l.Debugln(err)
r.setError(err)
r.SetError(err)
continue
}
l.Debugf("recv %d bytes from %s", n, addr)
@@ -224,8 +235,6 @@ func (r *multicastReader) serve(stop chan struct{}) {
copy(c, bs)
select {
case r.outbox <- recv{c, addr}:
case <-stop:
return
default:
l.Debugln("dropping message")
}

View File

@@ -11,7 +11,9 @@ package fs
import "github.com/syncthing/notify"
const (
subEventMask = notify.NoteDelete | notify.NoteWrite | notify.NoteRename
permEventMask = notify.NoteAttrib
// Platform independent notify.Create is required, as kqueue does not have
// any event signalling file creation, but notify does generate those internally.
subEventMask = notify.NoteDelete | notify.NoteWrite | notify.NoteRename | notify.Create
permEventMask = notify.NoteAttrib | notify.NoteExtend
rmEventMask = notify.NoteDelete | notify.NoteRename
)

View File

@@ -423,6 +423,12 @@ func (f *folder) scanSubdirs(subDirs []string) error {
var iterError error
f.fset.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool {
select {
case <-f.ctx.Done():
return false
default:
}
file := fi.(db.FileInfoTruncated)
if err := batch.flushIfFull(); err != nil {
@@ -507,6 +513,12 @@ func (f *folder) scanSubdirs(subDirs []string) error {
return true
})
select {
case <-f.ctx.Done():
return f.ctx.Err()
default:
}
if iterError == nil && len(toIgnore) > 0 {
for _, file := range toIgnore {
l.Debugln("marking file as ignored", f)

View File

@@ -68,7 +68,7 @@ func (f *sendOnlyFolder) pull() bool {
curFile, ok := f.fset.Get(protocol.LocalDeviceID, intf.FileName())
if !ok {
if intf.IsDeleted() {
panic("Should never get a deleted file as needed when we don't have it")
l.Debugln("Should never get a deleted file as needed when we don't have it")
}
return true
}

View File

@@ -248,10 +248,8 @@ func (m *model) StartFolder(folder string) {
// Need to hold lock on m.fmut when calling this.
func (m *model) startFolderLocked(cfg config.FolderConfiguration) {
if err := m.checkFolderRunningLocked(cfg.ID); err == errFolderMissing {
l.Warnln("Cannot start nonexistent folder", cfg.Description())
panic("cannot start nonexistent folder")
} else if err == nil {
_, ok := m.folderRunners[cfg.ID]
if ok {
l.Warnln("Cannot start already running folder", cfg.Description())
panic("cannot start already running folder")
}
@@ -461,18 +459,16 @@ func (m *model) RestartFolder(from, to config.FolderConfiguration) {
errMsg = "restarting"
}
var fset *db.FileSet
if !to.Paused {
// Creating the fileset can take a long time (metadata calculation)
// so we do it outside of the lock.
fset = db.NewFileSet(to.ID, to.Filesystem(), m.db)
}
m.fmut.Lock()
defer m.fmut.Unlock()
m.tearDownFolderLocked(from, fmt.Errorf("%v folder %v", errMsg, to.Description()))
if !to.Paused {
// Creating the fileset can take a long time (metadata calculation)
// so we do it outside of the lock.
m.fmut.Unlock()
fset := db.NewFileSet(to.ID, to.Filesystem(), m.db)
m.fmut.Lock()
m.addFolderLocked(to, fset)
m.startFolderLocked(to)
}
@@ -847,6 +843,10 @@ func (m *model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfo
if runnerOk {
progressNames, queuedNames, skipped := runner.Jobs(page, perpage)
progress = make([]db.FileInfoTruncated, len(progressNames))
queued = make([]db.FileInfoTruncated, len(queuedNames))
seen = make(map[string]struct{}, len(progressNames)+len(queuedNames))
for i, name := range progressNames {
if f, ok := rf.GetGlobalTruncated(name); ok {
progress[i] = f
@@ -2238,7 +2238,7 @@ func (m *model) WatchError(folder string) error {
m.fmut.RLock()
defer m.fmut.RUnlock()
if err := m.checkFolderRunningLocked(folder); err != nil {
return err
return nil // If the folder isn't running, there's no error to report.
}
return m.folderRunners[folder].WatchError()
}
@@ -2549,6 +2549,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
if toCfg.Paused {
l.Infoln("Pausing", deviceID)
m.closeConn(deviceID, errDevicePaused)
events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()})
} else {
events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()})

View File

@@ -26,6 +26,7 @@ import (
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/osutil"
@@ -3303,3 +3304,31 @@ func TestConnCloseOnRestart(t *testing.T) {
t.Fatal("Timed out before connection was closed")
}
}
func TestDevicePause(t *testing.T) {
sub := events.Default.Subscribe(events.DevicePaused)
defer events.Default.Unsubscribe(sub)
m, _, fcfg := setupModelWithConnection()
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
m.pmut.RLock()
closed := m.closed[device1]
m.pmut.RUnlock()
dev := m.cfg.Devices()[device1]
dev.Paused = true
m.cfg.SetDevice(dev)
timeout := time.NewTimer(5 * time.Second)
select {
case <-sub.C():
select {
case <-closed:
case <-timeout.C:
t.Fatal("Timed out before connection was closed")
}
case <-timeout.C:
t.Fatal("Timed out before device was paused")
}
}

View File

@@ -13,6 +13,7 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"testing"
"time"
@@ -947,3 +948,46 @@ func TestRequestDeleteChanged(t *testing.T) {
}
}
}
func TestNeedFolderFiles(t *testing.T) {
m, fc, fcfg := setupModelWithConnection()
tfs := fcfg.Filesystem()
tmpDir := tfs.URI()
defer cleanupModelAndRemoveDir(m, tmpDir)
sub := events.Default.Subscribe(events.RemoteIndexUpdated)
defer events.Default.Unsubscribe(sub)
errPreventSync := errors.New("you aren't getting any of this")
fc.mut.Lock()
fc.requestFn = func(string, string, int64, int, []byte, bool) ([]byte, error) {
return nil, errPreventSync
}
fc.mut.Unlock()
data := []byte("foo")
num := 20
for i := 0; i < num; i++ {
fc.addFile(strconv.Itoa(i), 0644, protocol.FileInfoTypeFile, data)
}
fc.sendIndexUpdate()
select {
case <-sub.C():
case <-time.After(5 * time.Second):
t.Fatal("Timed out before receiving index")
}
progress, queued, rest := m.NeedFolderFiles(fcfg.ID, 1, 100)
if got := len(progress) + len(queued) + len(rest); got != num {
t.Errorf("Got %v needed items, expected %v", got, num)
}
exp := 10
for page := 1; page < 3; page++ {
progress, queued, rest := m.NeedFolderFiles(fcfg.ID, page, exp)
if got := len(progress) + len(queued) + len(rest); got != exp {
t.Errorf("Got %v needed items on page %v, expected %v", got, page, exp)
}
}
}

View File

@@ -19,7 +19,7 @@ func Register(provider DiscoverFunc) {
providers = append(providers, provider)
}
func discoverAll(renewal, timeout time.Duration) map[string]Device {
func discoverAll(renewal, timeout time.Duration, stop chan struct{}) map[string]Device {
wg := &sync.WaitGroup{}
wg.Add(len(providers))
@@ -28,20 +28,32 @@ func discoverAll(renewal, timeout time.Duration) map[string]Device {
for _, discoverFunc := range providers {
go func(f DiscoverFunc) {
defer wg.Done()
for _, dev := range f(renewal, timeout) {
c <- dev
select {
case c <- dev:
case <-stop:
return
}
}
wg.Done()
}(discoverFunc)
}
nats := make(map[string]Device)
go func() {
for dev := range c {
nats[dev.ID()] = dev
defer close(done)
for {
select {
case dev, ok := <-c:
if !ok {
return
}
nats[dev.ID()] = dev
case <-stop:
return
}
}
close(done)
}()
wg.Wait()

View File

@@ -14,17 +14,21 @@ import (
stdsync "sync"
"time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
)
// Service runs a loop for discovery of IGDs (Internet Gateway Devices) and
// setup/renewal of a port mapping.
type Service struct {
id protocol.DeviceID
cfg config.Wrapper
stop chan struct{}
suture.Service
id protocol.DeviceID
cfg config.Wrapper
mappings []*Mapping
timer *time.Timer
@@ -32,27 +36,28 @@ type Service struct {
}
func NewService(id protocol.DeviceID, cfg config.Wrapper) *Service {
return &Service{
s := &Service{
id: id,
cfg: cfg,
timer: time.NewTimer(0),
mut: sync.NewRWMutex(),
}
s.Service = util.AsService(s.serve)
return s
}
func (s *Service) Serve() {
func (s *Service) serve(stop chan struct{}) {
announce := stdsync.Once{}
s.mut.Lock()
s.timer.Reset(0)
s.stop = make(chan struct{})
s.mut.Unlock()
for {
select {
case <-s.timer.C:
if found := s.process(); found != -1 {
if found := s.process(stop); found != -1 {
announce.Do(func() {
suffix := "s"
if found == 1 {
@@ -61,7 +66,7 @@ func (s *Service) Serve() {
l.Infoln("Detected", found, "NAT service"+suffix)
})
}
case <-s.stop:
case <-stop:
s.timer.Stop()
s.mut.RLock()
for _, mapping := range s.mappings {
@@ -73,7 +78,7 @@ func (s *Service) Serve() {
}
}
func (s *Service) process() int {
func (s *Service) process(stop chan struct{}) int {
// toRenew are mappings which are due for renewal
// toUpdate are the remaining mappings, which will only be updated if one of
// the old IGDs has gone away, or a new IGD has appeared, but only if we
@@ -115,25 +120,19 @@ func (s *Service) process() int {
return -1
}
nats := discoverAll(time.Duration(s.cfg.Options().NATRenewalM)*time.Minute, time.Duration(s.cfg.Options().NATTimeoutS)*time.Second)
nats := discoverAll(time.Duration(s.cfg.Options().NATRenewalM)*time.Minute, time.Duration(s.cfg.Options().NATTimeoutS)*time.Second, stop)
for _, mapping := range toRenew {
s.updateMapping(mapping, nats, true)
s.updateMapping(mapping, nats, true, stop)
}
for _, mapping := range toUpdate {
s.updateMapping(mapping, nats, false)
s.updateMapping(mapping, nats, false, stop)
}
return len(nats)
}
func (s *Service) Stop() {
s.mut.RLock()
close(s.stop)
s.mut.RUnlock()
}
func (s *Service) NewMapping(protocol Protocol, ip net.IP, port int) *Mapping {
mapping := &Mapping{
protocol: protocol,
@@ -178,17 +177,17 @@ func (s *Service) RemoveMapping(mapping *Mapping) {
// acquire mappings for natds which the mapping was unaware of before.
// Optionally takes renew flag which indicates whether or not we should renew
// mappings with existing natds
func (s *Service) updateMapping(mapping *Mapping, nats map[string]Device, renew bool) {
func (s *Service) updateMapping(mapping *Mapping, nats map[string]Device, renew bool, stop chan struct{}) {
var added, removed []Address
renewalTime := time.Duration(s.cfg.Options().NATRenewalM) * time.Minute
mapping.expires = time.Now().Add(renewalTime)
newAdded, newRemoved := s.verifyExistingMappings(mapping, nats, renew)
newAdded, newRemoved := s.verifyExistingMappings(mapping, nats, renew, stop)
added = append(added, newAdded...)
removed = append(removed, newRemoved...)
newAdded, newRemoved = s.acquireNewMappings(mapping, nats)
newAdded, newRemoved = s.acquireNewMappings(mapping, nats, stop)
added = append(added, newAdded...)
removed = append(removed, newRemoved...)
@@ -197,12 +196,18 @@ func (s *Service) updateMapping(mapping *Mapping, nats map[string]Device, renew
}
}
func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Device, renew bool) ([]Address, []Address) {
func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Device, renew bool, stop chan struct{}) ([]Address, []Address) {
var added, removed []Address
leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute
for id, address := range mapping.addressMap() {
select {
case <-stop:
return nil, nil
default:
}
// Delete addresses for NATDevice's that do not exist anymore
nat, ok := nats[id]
if !ok {
@@ -220,7 +225,7 @@ func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Devic
l.Debugf("Renewing %s -> %s mapping on %s", mapping, address, id)
addr, err := s.tryNATDevice(nat, mapping.address.Port, address.Port, leaseTime)
addr, err := s.tryNATDevice(nat, mapping.address.Port, address.Port, leaseTime, stop)
if err != nil {
l.Debugf("Failed to renew %s -> mapping on %s", mapping, address, id)
mapping.removeAddress(id)
@@ -242,13 +247,19 @@ func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Devic
return added, removed
}
func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device) ([]Address, []Address) {
func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device, stop chan struct{}) ([]Address, []Address) {
var added, removed []Address
leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute
addrMap := mapping.addressMap()
for id, nat := range nats {
select {
case <-stop:
return nil, nil
default:
}
if _, ok := addrMap[id]; ok {
continue
}
@@ -263,7 +274,7 @@ func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device) (
l.Debugf("Acquiring %s mapping on %s", mapping, id)
addr, err := s.tryNATDevice(nat, mapping.address.Port, 0, leaseTime)
addr, err := s.tryNATDevice(nat, mapping.address.Port, 0, leaseTime, stop)
if err != nil {
l.Debugf("Failed to acquire %s mapping on %s", mapping, id)
continue
@@ -280,7 +291,7 @@ func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device) (
// tryNATDevice tries to acquire a port mapping for the given internal address to
// the given external port. If external port is 0, picks a pseudo-random port.
func (s *Service) tryNATDevice(natd Device, intPort, extPort int, leaseTime time.Duration) (Address, error) {
func (s *Service) tryNATDevice(natd Device, intPort, extPort int, leaseTime time.Duration, stop chan struct{}) (Address, error) {
var err error
var port int
@@ -301,6 +312,12 @@ func (s *Service) tryNATDevice(natd Device, intPort, extPort int, leaseTime time
}
for i := 0; i < 10; i++ {
select {
case <-stop:
return Address{}, nil
default:
}
// Then try up to ten random ports.
extPort = 1024 + predictableRand.Intn(65535-1024)
name := fmt.Sprintf("syncthing-%d", extPort)

View File

@@ -69,15 +69,7 @@ func (c *dynamicClient) serve(stop chan struct{}) error {
addrs = append(addrs, ruri.String())
}
defer func() {
c.mut.RLock()
if c.client != nil {
c.client.Stop()
}
c.mut.RUnlock()
}()
for _, addr := range relayAddressesOrder(addrs) {
for _, addr := range relayAddressesOrder(addrs, stop) {
select {
case <-stop:
l.Debugln(c, "stopping")
@@ -104,6 +96,15 @@ func (c *dynamicClient) serve(stop chan struct{}) error {
return fmt.Errorf("could not find a connectable relay")
}
func (c *dynamicClient) Stop() {
c.mut.RLock()
if c.client != nil {
c.client.Stop()
}
c.mut.RUnlock()
c.commonClient.Stop()
}
func (c *dynamicClient) Error() error {
c.mut.RLock()
defer c.mut.RUnlock()
@@ -147,7 +148,7 @@ type dynamicAnnouncement struct {
// the closest 50ms, and puts them in buckets of 50ms latency ranges. Then
// shuffles each bucket, and returns all addresses starting with the ones from
// the lowest latency bucket, ending with the highest latency buceket.
func relayAddressesOrder(input []string) []string {
func relayAddressesOrder(input []string, stop chan struct{}) []string {
buckets := make(map[int][]string)
for _, relay := range input {
@@ -159,6 +160,12 @@ func relayAddressesOrder(input []string) []string {
id := int(latency/time.Millisecond) / 50
buckets[id] = append(buckets[id], relay)
select {
case <-stop:
return nil
default:
}
}
var ids []int

View File

@@ -109,8 +109,8 @@ func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Servi
}
func (s *Service) Stop() {
s.Service.Stop()
_ = s.stunConn.Close()
s.Service.Stop()
}
func (s *Service) serve(stop chan struct{}) {
@@ -163,7 +163,11 @@ func (s *Service) serve(stop chan struct{}) {
// We failed to contact all provided stun servers or the nat is not punchable.
// Chillout for a while.
time.Sleep(stunRetryInterval)
select {
case <-time.After(stunRetryInterval):
case <-stop:
return
}
}
}

View File

@@ -10,42 +10,38 @@ import (
"encoding/json"
"io"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/util"
)
// The auditService subscribes to events and writes these in JSON format, one
// event per line, to the specified writer.
type auditService struct {
w io.Writer // audit destination
stop chan struct{} // signals time to stop
started chan struct{} // signals startup complete
stopped chan struct{} // signals stop complete
suture.Service
w io.Writer // audit destination
sub *events.Subscription
}
func newAuditService(w io.Writer) *auditService {
return &auditService{
w: w,
stop: make(chan struct{}),
started: make(chan struct{}),
stopped: make(chan struct{}),
s := &auditService{
w: w,
sub: events.Default.Subscribe(events.AllEvents),
}
s.Service = util.AsService(s.serve)
return s
}
// Serve runs the audit service.
func (s *auditService) Serve() {
defer close(s.stopped)
sub := events.Default.Subscribe(events.AllEvents)
defer events.Default.Unsubscribe(sub)
// serve runs the audit service.
func (s *auditService) serve(stop chan struct{}) {
enc := json.NewEncoder(s.w)
// We're ready to start processing events.
close(s.started)
for {
select {
case ev := <-sub.C():
case ev := <-s.sub.C():
enc.Encode(ev)
case <-s.stop:
case <-stop:
return
}
}
@@ -53,17 +49,6 @@ func (s *auditService) Serve() {
// Stop stops the audit service.
func (s *auditService) Stop() {
close(s.stop)
}
// WaitForStart returns once the audit service is ready to receive events, or
// immediately if it's already running.
func (s *auditService) WaitForStart() {
<-s.started
}
// WaitForStop returns once the audit service has stopped.
// (Needed by the tests.)
func (s *auditService) WaitForStop() {
<-s.stopped
s.Service.Stop()
events.Default.Unsubscribe(s.sub)
}

View File

@@ -17,13 +17,12 @@ import (
func TestAuditService(t *testing.T) {
buf := new(bytes.Buffer)
service := newAuditService(buf)
// Event sent before start, will not be logged
// Event sent before construction, will not be logged
events.Default.Log(events.ConfigSaved, "the first event")
service := newAuditService(buf)
go service.Serve()
service.WaitForStart()
// Event that should end up in the audit log
events.Default.Log(events.ConfigSaved, "the second event")
@@ -32,7 +31,6 @@ func TestAuditService(t *testing.T) {
time.Sleep(10 * time.Millisecond)
service.Stop()
service.WaitForStop()
// This event should not be logged, since we have stopped.
events.Default.Log(events.ConfigSaved, "the third event")

View File

@@ -121,12 +121,8 @@ func (a *App) startup() error {
})
a.mainService.ServeBackground()
// Set a log prefix similar to the ID we will have later on, or early log
// lines look ugly.
l.SetPrefix("[start] ")
if a.opts.AuditWriter != nil {
a.startAuditing()
a.mainService.Add(newAuditService(a.opts.AuditWriter))
}
if a.opts.Verbose {
@@ -147,10 +143,9 @@ func (a *App) startup() error {
// report the error if there is one.
osutil.MaximizeOpenFileLimit()
// Figure out our device ID, set it as the log prefix and log it.
a.myID = protocol.NewDeviceID(a.cert.Certificate[0])
l.SetPrefix(fmt.Sprintf("[%s] ", a.myID.String()[:5]))
l.Infoln(build.LongVersion)
l.Infoln("My ID:", a.myID)
// Select SHA256 implementation and report. Affected by the
@@ -407,26 +402,17 @@ func (a *App) Stop(stopReason ExitStatus) ExitStatus {
case <-a.stopped:
case <-a.stop:
default:
// ExitSuccess is the default value for a.exitStatus. If another status
// was already set, ignore the stop reason given as argument to Stop.
if a.exitStatus == ExitSuccess {
a.exitStatus = stopReason
}
close(a.stop)
}
<-a.stopped
// ExitSuccess is the default value for a.exitStatus. If another status
// was already set, ignore the stop reason given as argument to Stop.
if a.exitStatus == ExitSuccess {
a.exitStatus = stopReason
<-a.stopped
}
return a.exitStatus
}
func (a *App) startAuditing() {
auditService := newAuditService(a.opts.AuditWriter)
a.mainService.Add(auditService)
// We wait for the audit service to fully start before we return, to
// ensure we capture all events from the start.
auditService.WaitForStart()
}
func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder) error {
guiCfg := a.cfg.GUI()

View File

@@ -9,45 +9,37 @@ package syncthing
import (
"fmt"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/util"
)
// The verbose logging service subscribes to events and prints these in
// verbose format to the console using INFO level.
type verboseService struct {
stop chan struct{} // signals time to stop
started chan struct{} // signals startup complete
suture.Service
sub *events.Subscription
}
func newVerboseService() *verboseService {
return &verboseService{
stop: make(chan struct{}),
started: make(chan struct{}),
s := &verboseService{
sub: events.Default.Subscribe(events.AllEvents),
}
s.Service = util.AsService(s.serve)
return s
}
// Serve runs the verbose logging service.
func (s *verboseService) Serve() {
sub := events.Default.Subscribe(events.AllEvents)
defer events.Default.Unsubscribe(sub)
select {
case <-s.started:
// The started channel has already been closed; do nothing.
default:
// This is the first time around. Indicate that we're ready to start
// processing events.
close(s.started)
}
// serve runs the verbose logging service.
func (s *verboseService) serve(stop chan struct{}) {
for {
select {
case ev := <-sub.C():
case ev := <-s.sub.C():
formatted := s.formatEvent(ev)
if formatted != "" {
l.Verboseln(formatted)
}
case <-s.stop:
case <-stop:
return
}
}
@@ -55,13 +47,9 @@ func (s *verboseService) Serve() {
// Stop stops the verbose logging service.
func (s *verboseService) Stop() {
close(s.stop)
}
s.Service.Stop()
events.Default.Unsubscribe(s.sub)
// WaitForStart returns once the verbose logging service is ready to receive
// events, or immediately if it's already running.
func (s *verboseService) WaitForStart() {
<-s.started
}
func (s *verboseService) formatEvent(ev events.Event) string {

View File

@@ -187,6 +187,7 @@ func AsService(fn func(stop chan struct{})) suture.Service {
type ServiceWithError interface {
suture.Service
Error() error
SetError(error)
}
// AsServiceWithError does the same as AsService, except that it keeps track
@@ -244,3 +245,9 @@ func (s *service) Error() error {
defer s.mut.Unlock()
return s.err
}
func (s *service) SetError(err error) {
s.mut.Lock()
s.err = err
s.mut.Unlock()
}