Compare commits

...

28 Commits

Author SHA1 Message Date
Jakob Borg
15b87ae297 Merge pull request #1704 from jarlebring/upnp_caps
Fix capitalization in HTTP-header in SOAP request (fixes #1696)
2015-04-26 20:10:14 +09:00
Jakob Borg
02fdf59839 Add jarlebring 2015-04-26 19:40:02 +09:00
jarlebring
d9da02b7a8 Formatting with gofmt 2015-04-26 12:37:37 +02:00
Elias Jarlebring
8f2ad6418d Fix capitalization in HTTP-header in SOAP request (fixes #1696)
Some routers are sensitive to the capitalization in  "SOAPAction" in the HTTP-header in SOAP request. This modification follows the recommendation of preserving caps in HTTP-headers in go described on http://stackoverflow.com/questions/26351716/how-to-keep-key-case-sensitive-in-request-header-using-golang?lq=1
2015-04-26 12:16:40 +02:00
Jakob Borg
ff984425a3 Merge pull request #1703 from AudriusButkevicius/page
Add pagination to Out of sync item list (fixes #1509)
2015-04-26 18:42:17 +09:00
Audrius Butkevicius
ac1058359f Rebuild assets 2015-04-26 00:22:30 +01:00
Audrius Butkevicius
9afbca3001 Add pagination to Out of sync item list (fixes #1509) 2015-04-26 00:22:26 +01:00
Audrius Butkevicius
ec3f17cb9c Add angular-dirPagination 2015-04-25 22:52:52 +01:00
Audrius Butkevicius
73b9d5c5f9 Merge pull request #1698 from calmh/pull-order
Configurable file pull order (alphabetic, random, by size or age)
2015-04-25 15:41:02 +01:00
Audrius Butkevicius
ecc8591c95 Merge pull request #1699 from calmh/connsvc
Break out connection handling into a service
2015-04-25 15:37:08 +01:00
Audrius Butkevicius
696b67e4b1 Merge pull request #1697 from calmh/auditsvc
Add audit log feature
2015-04-25 15:34:34 +01:00
Jakob Borg
266a5116a1 Break out connection handling into a service 2015-04-25 23:21:42 +09:00
Jakob Borg
131f2be857 Add audit log feature 2015-04-25 23:20:39 +09:00
Jakob Borg
be7b3a9952 Configurable file pull order (alphabetic, random, by size or age) 2015-04-25 23:20:21 +09:00
Jakob Borg
bb31b1785b Add a service manager to main (future use) 2015-04-25 23:16:46 +09:00
Jakob Borg
2a60f4b1e9 Add .gitattributes; normalize line endings 2015-04-25 23:16:46 +09:00
Jakob Borg
33a4fb5a1a Fix folder check tests 2015-04-25 23:16:46 +09:00
Audrius Butkevicius
aece6e8b6c Merge pull request #1689 from calmh/nolocks
events.Subscription.Poll does not seem to require locking
2015-04-24 10:26:58 +01:00
Jakob Borg
7bf55dd14f events.Subscription.Poll does not seem to require locking
This is a large source of output from the new lock logging, and it
doesn't seem to accomplish anything useful that I can see. Running
integration with the race detector to make sure...
2015-04-24 11:25:42 +09:00
Jakob Borg
e158f17c2b Adjust sync test intervals to be less latency sensitive 2015-04-24 11:25:24 +09:00
Jakob Borg
c5027d9478 Merge branch 'pr-1688'
* pr-1688:
  Minor fixup
  Add tests, fix getCaller, replace wg.Done with wg.Wait
2015-04-24 09:43:52 +09:00
Jakob Borg
36c1d82146 Minor fixup 2015-04-24 09:43:40 +09:00
Audrius Butkevicius
bd4f404d45 Add tests, fix getCaller, replace wg.Done with wg.Wait 2015-04-23 20:09:14 +01:00
Jakob Borg
43d39844f7 Merge pull request #1685 from AudriusButkevicius/mut
Add mutex logging
2015-04-23 21:16:23 +09:00
Audrius Butkevicius
e041a4d212 Track RUnlockers while locking a RWMutex 2015-04-23 11:29:23 +01:00
Audrius Butkevicius
433b923ea7 Add mutex logging 2015-04-23 10:54:14 +01:00
Audrius Butkevicius
f8f1c72b44 Merge pull request #1686 from calmh/major-upgrade-v11
Allow major upgrades
2015-04-23 09:31:58 +01:00
Jakob Borg
542716e216 Allow major upgrades 2015-04-23 17:13:11 +09:00
57 changed files with 1931 additions and 257 deletions

9
.gitattributes vendored Normal file
View File

@@ -0,0 +1,9 @@
# Text files use LF line endings in this repository
* text=auto
# Except the dependencies, which we leave alone
Godeps/** -text=auto
# Diffs on these files are meaningless
gui.files.go -diff
*.svg -diff

View File

@@ -18,6 +18,7 @@ Colin Kennedy <moshen.colin@gmail.com>
Daniel Martí <mvdan@mvdan.cc>
Dennis Wilson <dw@risu.io>
Dominik Heidler <dominik@heidler.eu>
Elias Jarlebring <jarlebring@gmail.com>
Emil Hessman <emil@hessman.se>
Federico Castagnini <federico.castagnini@gmail.com>
Felix Ableitner <me@nutomic.com>

1
NICKS
View File

@@ -24,6 +24,7 @@ facastagnini <federico.castagnini@gmail.com>
filoozoom <philippe@schommers.be>
frioux <frew@afoolishmanifesto.com> <frioux@gmail.com>
gillisig <gilli@vx.is>
jarlebring <jarlebring@gmail.com>
jedie <github.com@jensdiemer.de> <git@jensdiemer.de>
jpjp <jamespatterson@operamail.com> <jpjp@users.noreply.github.com>
kamadak <kamada@nanohz.org>

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.7 KiB

After

Width:  |  Height:  |  Size: 3.6 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.8 KiB

After

Width:  |  Height:  |  Size: 1.7 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.8 KiB

After

Width:  |  Height:  |  Size: 3.7 KiB

69
cmd/syncthing/audit.go Normal file
View File

@@ -0,0 +1,69 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package main
import (
"encoding/json"
"io"
"github.com/syncthing/syncthing/internal/events"
)
// The auditSvc subscribes to events and writes these in JSON format, one
// event per line, to the specified writer.
type auditSvc struct {
w io.Writer // audit destination
stop chan struct{} // signals time to stop
started chan struct{} // signals startup complete
stopped chan struct{} // signals stop complete
}
func newAuditSvc(w io.Writer) *auditSvc {
return &auditSvc{
w: w,
stop: make(chan struct{}),
started: make(chan struct{}),
stopped: make(chan struct{}),
}
}
// Serve runs the audit service.
func (s *auditSvc) Serve() {
defer close(s.stopped)
sub := events.Default.Subscribe(events.AllEvents)
defer events.Default.Unsubscribe(sub)
enc := json.NewEncoder(s.w)
// We're ready to start processing events.
close(s.started)
for {
select {
case ev := <-sub.C():
enc.Encode(ev)
case <-s.stop:
return
}
}
}
// Stop stops the audit service.
func (s *auditSvc) Stop() {
close(s.stop)
}
// WaitForStart returns once the audit service is ready to receive events, or
// immediately if it's already running.
func (s *auditSvc) WaitForStart() {
<-s.started
}
// WaitForStop returns once the audit service has stopped.
// (Needed by the tests.)
func (s *auditSvc) WaitForStop() {
<-s.stopped
}

View File

@@ -0,0 +1,54 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package main
import (
"bytes"
"strings"
"testing"
"time"
"github.com/syncthing/syncthing/internal/events"
)
func TestAuditService(t *testing.T) {
buf := new(bytes.Buffer)
svc := newAuditSvc(buf)
// Event sent before start, will not be logged
events.Default.Log(events.Ping, "the first event")
go svc.Serve()
svc.WaitForStart()
// Event that should end up in the audit log
events.Default.Log(events.Ping, "the second event")
// We need to give the events time to arrive, since the channels are buffered etc.
time.Sleep(10 * time.Millisecond)
svc.Stop()
svc.WaitForStop()
// This event should not be logged, since we have stopped.
events.Default.Log(events.Ping, "the third event")
result := string(buf.Bytes())
t.Log(result)
if strings.Contains(result, "first event") {
t.Error("Unexpected first event")
}
if !strings.Contains(result, "second event") {
t.Error("Missing second event")
}
if strings.Contains(result, "third event") {
t.Error("Missing third event")
}
}

View File

@@ -15,23 +15,84 @@ import (
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/model"
"github.com/thejerf/suture"
)
func listenConnect(myID protocol.DeviceID, m *model.Model, tlsCfg *tls.Config) {
var conns = make(chan *tls.Conn)
// The connection service listens on TLS and dials configured unconnected
// devices. Successfull connections are handed to the model.
type connectionSvc struct {
*suture.Supervisor
cfg *config.Wrapper
myID protocol.DeviceID
model *model.Model
tlsCfg *tls.Config
conns chan *tls.Conn
}
// Listen
for _, addr := range cfg.Options().ListenAddress {
go listenTLS(conns, addr, tlsCfg)
func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, model *model.Model, tlsCfg *tls.Config) *connectionSvc {
svc := &connectionSvc{
Supervisor: suture.NewSimple("connectionSvc"),
cfg: cfg,
myID: myID,
model: model,
tlsCfg: tlsCfg,
conns: make(chan *tls.Conn),
}
// Connect
go dialTLS(m, conns, tlsCfg)
// There are several moving parts here; one routine per listening address
// to handle incoming connections, one routine to periodically attempt
// outgoing connections, and lastly one routine to the the common handling
// regardless of whether the connection was incoming or outgoing. It ends
// up as in the diagram below. We embed a Supervisor to manage the
// routines (i.e. log and restart if they crash or exit, etc).
//
// +-----------------+
// Incoming | +---------------+-+ +-----------------+
// Connections | | | | | Outgoing
// -------------->| | svc.listen | | | Connections
// | | (1 per listen | | svc.connect |-------------->
// | | address) | | |
// +-+ | | |
// +-----------------+ +-----------------+
// v v
// | |
// | |
// +------------+-----------+
// |
// | svc.conns
// v
// +-----------------+
// | |
// | |
// | svc.handle |------> model.AddConnection()
// | |
// | |
// +-----------------+
//
// TODO: Clean shutdown, and/or handling config changes on the fly. We
// partly do this now - new devices and addresses will be picked up, but
// not new listen addresses and we don't support disconnecting devices
// that are removed and so on...
svc.Add(serviceFunc(svc.connect))
for _, addr := range svc.cfg.Options().ListenAddress {
addr := addr
listener := serviceFunc(func() {
svc.listen(addr)
})
svc.Add(listener)
}
svc.Add(serviceFunc(svc.handle))
return svc
}
func (s *connectionSvc) handle() {
next:
for conn := range conns {
for conn := range s.conns {
cs := conn.ConnectionState()
// We should have negotiated the next level protocol "bep/1.0" as part
@@ -69,13 +130,13 @@ next:
// this one. But in case we are two devices connecting to each other
// in parallell we don't want to do that or we end up with no
// connections still established...
if m.ConnectedTo(remoteID) {
if s.model.ConnectedTo(remoteID) {
l.Infof("Connected to already connected device (%s)", remoteID)
conn.Close()
continue
}
for deviceID, deviceCfg := range cfg.Devices() {
for deviceID, deviceCfg := range s.cfg.Devices() {
if deviceID == remoteID {
// Verify the name on the certificate. By default we set it to
// "syncthing" when generating, but the user may have replaced
@@ -97,7 +158,7 @@ next:
// If rate limiting is set, and based on the address we should
// limit the connection, then we wrap it in a limiter.
limit := shouldLimit(conn.RemoteAddr())
limit := s.shouldLimit(conn.RemoteAddr())
wr := io.Writer(conn)
if limit && writeRateLimit != nil {
@@ -110,7 +171,7 @@ next:
}
name := fmt.Sprintf("%s-%s", conn.LocalAddr(), conn.RemoteAddr())
protoConn := protocol.NewConnection(remoteID, rd, wr, m, name, deviceCfg.Compression)
protoConn := protocol.NewConnection(remoteID, rd, wr, s.model, name, deviceCfg.Compression)
l.Infof("Established secure connection to %s at %s", remoteID, name)
if debugNet {
@@ -121,12 +182,12 @@ next:
"addr": conn.RemoteAddr().String(),
})
m.AddConnection(conn, protoConn)
s.model.AddConnection(conn, protoConn)
continue next
}
}
if !cfg.IgnoredDevice(remoteID) {
if !s.cfg.IgnoredDevice(remoteID) {
events.Default.Log(events.DeviceRejected, map[string]string{
"device": remoteID.String(),
"address": conn.RemoteAddr().String(),
@@ -140,7 +201,7 @@ next:
}
}
func listenTLS(conns chan *tls.Conn, addr string, tlsCfg *tls.Config) {
func (s *connectionSvc) listen(addr string) {
if debugNet {
l.Debugln("listening on", addr)
}
@@ -166,9 +227,9 @@ func listenTLS(conns chan *tls.Conn, addr string, tlsCfg *tls.Config) {
}
tcpConn := conn.(*net.TCPConn)
setTCPOptions(tcpConn)
s.setTCPOptions(tcpConn)
tc := tls.Server(conn, tlsCfg)
tc := tls.Server(conn, s.tlsCfg)
err = tc.Handshake()
if err != nil {
l.Infoln("TLS handshake:", err)
@@ -176,21 +237,20 @@ func listenTLS(conns chan *tls.Conn, addr string, tlsCfg *tls.Config) {
continue
}
conns <- tc
s.conns <- tc
}
}
func dialTLS(m *model.Model, conns chan *tls.Conn, tlsCfg *tls.Config) {
func (s *connectionSvc) connect() {
delay := time.Second
for {
nextDevice:
for deviceID, deviceCfg := range cfg.Devices() {
for deviceID, deviceCfg := range s.cfg.Devices() {
if deviceID == myID {
continue
}
if m.ConnectedTo(deviceID) {
if s.model.ConnectedTo(deviceID) {
continue
}
@@ -238,9 +298,9 @@ func dialTLS(m *model.Model, conns chan *tls.Conn, tlsCfg *tls.Config) {
continue
}
setTCPOptions(conn)
s.setTCPOptions(conn)
tc := tls.Client(conn, tlsCfg)
tc := tls.Client(conn, s.tlsCfg)
err = tc.Handshake()
if err != nil {
l.Infoln("TLS handshake:", err)
@@ -248,20 +308,20 @@ func dialTLS(m *model.Model, conns chan *tls.Conn, tlsCfg *tls.Config) {
continue
}
conns <- tc
s.conns <- tc
continue nextDevice
}
}
time.Sleep(delay)
delay *= 2
if maxD := time.Duration(cfg.Options().ReconnectIntervalS) * time.Second; delay > maxD {
if maxD := time.Duration(s.cfg.Options().ReconnectIntervalS) * time.Second; delay > maxD {
delay = maxD
}
}
}
func setTCPOptions(conn *net.TCPConn) {
func (*connectionSvc) setTCPOptions(conn *net.TCPConn) {
var err error
if err = conn.SetLinger(0); err != nil {
l.Infoln(err)
@@ -277,8 +337,8 @@ func setTCPOptions(conn *net.TCPConn) {
}
}
func shouldLimit(addr net.Addr) bool {
if cfg.Options().LimitBandwidthInLan {
func (s *connectionSvc) shouldLimit(addr net.Addr) bool {
if s.cfg.Options().LimitBandwidthInLan {
return true
}

View File

@@ -22,7 +22,6 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/calmh/logger"
@@ -34,6 +33,7 @@ import (
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/model"
"github.com/syncthing/syncthing/internal/osutil"
"github.com/syncthing/syncthing/internal/sync"
"github.com/syncthing/syncthing/internal/upgrade"
"github.com/vitrun/qart/qr"
"golang.org/x/crypto/bcrypt"
@@ -45,16 +45,16 @@ type guiError struct {
}
var (
configInSync = true
guiErrors = []guiError{}
guiErrorsMut sync.Mutex
startTime = time.Now()
configInSync = true
guiErrors = []guiError{}
guiErrorsMut sync.Mutex = sync.NewMutex()
startTime = time.Now()
eventSub *events.BufferedSubscription
)
var (
lastEventRequest time.Time
lastEventRequestMut sync.Mutex
lastEventRequestMut sync.Mutex = sync.NewMutex()
)
func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) error {
@@ -111,7 +111,7 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro
getRestMux.HandleFunc("/rest/db/completion", withModel(m, restGetDBCompletion)) // device folder
getRestMux.HandleFunc("/rest/db/file", withModel(m, restGetDBFile)) // folder file
getRestMux.HandleFunc("/rest/db/ignores", withModel(m, restGetDBIgnores)) // folder
getRestMux.HandleFunc("/rest/db/need", withModel(m, restGetDBNeed)) // folder
getRestMux.HandleFunc("/rest/db/need", withModel(m, restGetDBNeed)) // folder [perpage] [page]
getRestMux.HandleFunc("/rest/db/status", withModel(m, restGetDBStatus)) // folder
getRestMux.HandleFunc("/rest/db/browse", withModel(m, restGetDBBrowse)) // folder [prefix] [dirsonly] [levels]
getRestMux.HandleFunc("/rest/events", restGetEvents) // since [limit]
@@ -133,7 +133,7 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro
// The POST handlers
postRestMux := http.NewServeMux()
postRestMux.HandleFunc("/rest/db/prio", withModel(m, restPostDBPrio)) // folder file
postRestMux.HandleFunc("/rest/db/prio", withModel(m, restPostDBPrio)) // folder file [perpage] [page]
postRestMux.HandleFunc("/rest/db/ignores", withModel(m, restPostDBIgnores)) // folder
postRestMux.HandleFunc("/rest/db/override", withModel(m, restPostDBOverride)) // folder
postRestMux.HandleFunc("/rest/db/scan", withModel(m, restPostDBScan)) // folder [sub...]
@@ -379,15 +379,29 @@ func restPostDBOverride(m *model.Model, w http.ResponseWriter, r *http.Request)
}
func restGetDBNeed(m *model.Model, w http.ResponseWriter, r *http.Request) {
var qs = r.URL.Query()
var folder = qs.Get("folder")
qs := r.URL.Query()
folder := qs.Get("folder")
page, err := strconv.Atoi(qs.Get("page"))
if err != nil || page < 1 {
page = 1
}
perpage, err := strconv.Atoi(qs.Get("perpage"))
if err != nil || perpage < 1 {
perpage = 1 << 16
}
progress, queued, rest, total := m.NeedFolderFiles(folder, page, perpage)
progress, queued, rest := m.NeedFolderFiles(folder, 100)
// Convert the struct to a more loose structure, and inject the size.
output := map[string][]jsonDBFileInfo{
output := map[string]interface{}{
"progress": toNeedSlice(progress),
"queued": toNeedSlice(queued),
"rest": toNeedSlice(rest),
"total": total,
"page": page,
"perpage": perpage,
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
@@ -522,7 +536,7 @@ func flushResponse(s string, w http.ResponseWriter) {
}
var cpuUsagePercent [10]float64 // The last ten seconds
var cpuUsageLock sync.RWMutex
var cpuUsageLock sync.RWMutex = sync.NewRWMutex()
func restGetSystemStatus(w http.ResponseWriter, r *http.Request) {
var m runtime.MemStats
@@ -686,7 +700,8 @@ func restGetSystemUpgrade(w http.ResponseWriter, r *http.Request) {
res := make(map[string]interface{})
res["running"] = Version
res["latest"] = rel.Tag
res["newer"] = upgrade.CompareVersions(rel.Tag, Version) == 1
res["newer"] = upgrade.CompareVersions(rel.Tag, Version) == upgrade.Newer
res["majorNewer"] = upgrade.CompareVersions(rel.Tag, Version) == upgrade.MajorNewer
w.Header().Set("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(res)
@@ -727,7 +742,7 @@ func restPostSystemUpgrade(w http.ResponseWriter, r *http.Request) {
return
}
if upgrade.CompareVersions(rel.Tag, Version) == 1 {
if upgrade.CompareVersions(rel.Tag, Version) > upgrade.Equal {
err = upgrade.To(rel)
if err != nil {
l.Warnln("upgrading:", err)

View File

@@ -12,16 +12,16 @@ import (
"math/rand"
"net/http"
"strings"
"sync"
"time"
"github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/sync"
"golang.org/x/crypto/bcrypt"
)
var (
sessions = make(map[string]bool)
sessionsMut sync.Mutex
sessions = make(map[string]bool)
sessionsMut sync.Mutex = sync.NewMutex()
)
func basicAuthAndSessionMiddleware(cfg config.GUIConfiguration, next http.Handler) http.Handler {

View File

@@ -12,14 +12,14 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/syncthing/syncthing/internal/osutil"
"github.com/syncthing/syncthing/internal/sync"
)
var csrfTokens []string
var csrfMut sync.Mutex
var csrfMut sync.Mutex = sync.NewMutex()
// Check for CSRF token on /rest/ URLs. If a correct one is not given, reject
// the request with 403. For / and /index.html, set a new CSRF cookie if none

View File

@@ -11,6 +11,7 @@ import (
"path/filepath"
"runtime"
"strings"
"time"
"github.com/syncthing/syncthing/internal/osutil"
)
@@ -29,6 +30,7 @@ const (
locLogFile = "logFile"
locCsrfTokens = "csrfTokens"
locPanicLog = "panicLog"
locAuditLog = "auditLog"
locDefFolder = "defFolder"
)
@@ -48,7 +50,8 @@ var locations = map[locationEnum]string{
locDatabase: "${config}/index-v0.11.0.db",
locLogFile: "${config}/syncthing.log", // -logfile on Windows
locCsrfTokens: "${config}/csrftokens.txt",
locPanicLog: "${config}/panic-20060102-150405.log", // passed through time.Format()
locPanicLog: "${config}/panic-${timestamp}.log",
locAuditLog: "${config}/audit-${timestamp}.log",
locDefFolder: "${home}/Sync",
}
@@ -107,3 +110,14 @@ func homeDir() string {
}
return home
}
func timestampedLoc(key locationEnum) string {
// We take the roundtrip via "${timestamp}" instead of passing the path
// directly through time.Format() to avoid issues when the path we are
// expanding contains numbers; otherwise for example
// /home/user2006/.../panic-20060102-150405.log would get both instances of
// 2006 replaced by 2015...
tpl := locations[key]
now := time.Now().Format("20060102-150405")
return strings.Replace(tpl, "${timestamp}", now, -1)
}

View File

@@ -39,6 +39,7 @@ import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/thejerf/suture"
"golang.org/x/crypto/bcrypt"
)
@@ -151,6 +152,7 @@ are mostly useful for developers. Use with care.
- "events" (the events package)
- "files" (the files package)
- "http" (the main package; HTTP requests)
- "locks" (the sync package; trace long held locks)
- "net" (the main package; connections & network messages)
- "model" (the model package)
- "scanner" (the scanner package)
@@ -194,6 +196,7 @@ var (
noConsole bool
generateDir string
logFile string
auditEnabled bool
noRestart = os.Getenv("STNORESTART") != ""
noUpgrade = os.Getenv("STNOUPGRADE") != ""
guiAddress = os.Getenv("STGUIADDRESS") // legacy
@@ -230,6 +233,7 @@ func main() {
flag.BoolVar(&doUpgradeCheck, "upgrade-check", false, "Check for available upgrade")
flag.BoolVar(&showVersion, "version", false, "Show version")
flag.StringVar(&upgradeTo, "upgrade-to", upgradeTo, "Force upgrade directly from specified URL")
flag.BoolVar(&auditEnabled, "audit", false, "Write events to audit file")
flag.Usage = usageFor(flag.CommandLine, usage, fmt.Sprintf(extraUsage, baseDirs["config"]))
flag.Parse()
@@ -372,7 +376,23 @@ func main() {
}
func syncthingMain() {
var err error
// Create a main service manager. We'll add things to this as we go along.
// We want any logging it does to go through our log system, with INFO
// severity.
mainSvc := suture.New("main", suture.Spec{
Log: func(line string) {
l.Infoln(line)
},
})
mainSvc.ServeBackground()
// Set a log prefix similar to the ID we will have later on, or early log
// lines look ugly.
l.SetPrefix("[start] ")
if auditEnabled {
startAuditing(mainSvc)
}
if len(os.Getenv("GOMAXPROCS")) == 0 {
runtime.GOMAXPROCS(runtime.NumCPU())
@@ -381,7 +401,7 @@ func syncthingMain() {
events.Default.Log(events.Starting, map[string]string{"home": baseDirs["config"]})
// Ensure that that we have a certificate and key.
cert, err = tls.LoadX509KeyPair(locations[locCertFile], locations[locKeyFile])
cert, err := tls.LoadX509KeyPair(locations[locCertFile], locations[locKeyFile])
if err != nil {
cert, err = newCertificate(locations[locCertFile], locations[locKeyFile], tlsDefaultCommonName)
if err != nil {
@@ -564,7 +584,9 @@ func syncthingMain() {
// Routine to connect out to configured devices
discoverer = discovery(externalPort)
go listenConnect(myID, m, tlsCfg)
connectionSvc := newConnectionSvc(cfg, myID, m, tlsCfg)
mainSvc.Add(connectionSvc)
for _, folder := range cfg.Folders() {
// Routine to pull blocks from other devices to synchronize the local
@@ -638,10 +660,29 @@ func syncthingMain() {
code := <-stop
mainSvc.Stop()
l.Okln("Exiting")
os.Exit(code)
}
func startAuditing(mainSvc *suture.Supervisor) {
auditFile := timestampedLoc(locAuditLog)
fd, err := os.OpenFile(auditFile, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
l.Fatalln("Audit:", err)
}
auditSvc := newAuditSvc(fd)
mainSvc.Add(auditSvc)
// We wait for the audit service to fully start before we return, to
// ensure we capture all events from the start.
auditSvc.WaitForStart()
l.Infoln("Audit log in", auditFile)
}
func setupGUI(cfg *config.Wrapper, m *model.Model) {
opts := cfg.Options()
guiCfg := overrideGUIConfig(cfg.GUI(), guiAddress, guiAuthentication, guiAPIKey)
@@ -1011,6 +1052,7 @@ func autoUpgrade() {
func cleanConfigDirectory() {
patterns := map[string]time.Duration{
"panic-*.log": 7 * 24 * time.Hour, // keep panic logs for a week
"audit-*.log": 7 * 24 * time.Hour, // keep audit logs for a week
"index": 14 * 24 * time.Hour, // keep old index format for two weeks
"config.xml.v*": 30 * 24 * time.Hour, // old config versions for a month
"*.idx.gz": 30 * 24 * time.Hour, // these should for sure no longer exist

View File

@@ -20,6 +20,10 @@ import (
)
func TestFolderErrors(t *testing.T) {
// This test intentionally avoids starting the folders. If they are
// started, they will perform an initial scan, which will create missing
// folder markers and race with the stuff we do in the test.
fcfg := config.FolderConfiguration{
ID: "folder",
RawPath: "testdata/testfolder",
@@ -29,10 +33,8 @@ func TestFolderErrors(t *testing.T) {
})
for _, file := range []string{".stfolder", "testfolder/.stfolder", "testfolder"} {
os.Remove("testdata/" + file)
_, err := os.Stat("testdata/" + file)
if err == nil {
t.Error("Found unexpected file")
if err := os.Remove("testdata/" + file); err != nil && !os.IsNotExist(err) {
t.Fatal(err)
}
}
@@ -42,7 +44,6 @@ func TestFolderErrors(t *testing.T) {
m := model.NewModel(cfg, protocol.LocalDeviceID, "device", "syncthing", "dev", ldb)
m.AddFolder(fcfg)
m.StartFolderRW("folder")
if err := m.CheckFolderHealth("folder"); err != nil {
t.Error("Unexpected error", cfg.Folders()["folder"].Invalid)
@@ -58,8 +59,12 @@ func TestFolderErrors(t *testing.T) {
t.Error(err)
}
os.Remove("testdata/testfolder/.stfolder")
os.Remove("testdata/testfolder/")
if err := os.Remove("testdata/testfolder/.stfolder"); err != nil {
t.Fatal(err)
}
if err := os.Remove("testdata/testfolder/"); err != nil {
t.Fatal(err)
}
// Case 2 - new folder, marker created
@@ -70,7 +75,6 @@ func TestFolderErrors(t *testing.T) {
m = model.NewModel(cfg, protocol.LocalDeviceID, "device", "syncthing", "dev", ldb)
m.AddFolder(fcfg)
m.StartFolderRW("folder")
if err := m.CheckFolderHealth("folder"); err != nil {
t.Error("Unexpected error", cfg.Folders()["folder"].Invalid)
@@ -81,7 +85,9 @@ func TestFolderErrors(t *testing.T) {
t.Error(err)
}
os.Remove("testdata/.stfolder")
if err := os.Remove("testdata/.stfolder"); err != nil {
t.Fatal(err)
}
// Case 3 - Folder marker missing
@@ -92,7 +98,6 @@ func TestFolderErrors(t *testing.T) {
m = model.NewModel(cfg, protocol.LocalDeviceID, "device", "syncthing", "dev", ldb)
m.AddFolder(fcfg)
m.StartFolderRW("folder")
if err := m.CheckFolderHealth("folder"); err == nil || err.Error() != "folder marker missing" {
t.Error("Incorrect error: Folder marker missing !=", m.CheckFolderHealth("folder"))
@@ -110,8 +115,12 @@ func TestFolderErrors(t *testing.T) {
// Case 4 - Folder path missing
os.Remove("testdata/testfolder/.stfolder")
os.Remove("testdata/testfolder/")
if err := os.Remove("testdata/testfolder/.stfolder"); err != nil && !os.IsNotExist(err) {
t.Fatal(err)
}
if err := os.Remove("testdata/testfolder"); err != nil && !os.IsNotExist(err) {
t.Fatal(err)
}
fcfg.RawPath = "testdata/testfolder"
cfg = config.Wrap("testdata/subfolder", config.Configuration{
@@ -120,7 +129,6 @@ func TestFolderErrors(t *testing.T) {
m = model.NewModel(cfg, protocol.LocalDeviceID, "device", "syncthing", "dev", ldb)
m.AddFolder(fcfg)
m.StartFolderRW("folder")
if err := m.CheckFolderHealth("folder"); err == nil || err.Error() != "folder path missing" {
t.Error("Incorrect error: Folder path missing !=", m.CheckFolderHealth("folder"))
@@ -128,7 +136,9 @@ func TestFolderErrors(t *testing.T) {
// Case 4.1 - recover after folder path missing
os.Mkdir("testdata/testfolder", 0700)
if err := os.Mkdir("testdata/testfolder", 0700); err != nil {
t.Fatal(err)
}
if err := m.CheckFolderHealth("folder"); err == nil || err.Error() != "folder marker missing" {
t.Error("Incorrect error: Folder marker missing !=", m.CheckFolderHealth("folder"))

View File

@@ -14,17 +14,17 @@ import (
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"
"github.com/syncthing/syncthing/internal/osutil"
"github.com/syncthing/syncthing/internal/sync"
)
var (
stdoutFirstLines []string // The first 10 lines of stdout
stdoutLastLines []string // The last 50 lines of stdout
stdoutMut sync.Mutex
stdoutFirstLines []string // The first 10 lines of stdout
stdoutLastLines []string // The last 50 lines of stdout
stdoutMut sync.Mutex = sync.NewMutex()
)
const (
@@ -163,7 +163,7 @@ func copyStderr(stderr io.ReadCloser, dst io.Writer) {
dst.Write([]byte(line))
if strings.HasPrefix(line, "panic:") || strings.HasPrefix(line, "fatal error:") {
panicFd, err = os.Create(time.Now().Format(locations[locPanicLog]))
panicFd, err = os.Create(timestampedLoc(locPanicLog))
if err != nil {
l.Warnln("Create panic log:", err)
continue

View File

@@ -7,11 +7,11 @@
package main
import (
"sync"
"time"
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/model"
"github.com/syncthing/syncthing/internal/sync"
"github.com/thejerf/suture"
)
@@ -37,6 +37,7 @@ func (c *folderSummarySvc) Serve() {
c.stop = make(chan struct{})
c.folders = make(map[string]struct{})
c.srv = srv
c.foldersMut = sync.NewMutex()
srv.Serve()
}

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.7 KiB

After

Width:  |  Height:  |  Size: 3.6 KiB

View File

@@ -1,4 +1,5 @@
{
"A new major version may not be compatible with previous versions.": "A new major version may not be compatible with previous versions.",
"API Key": "API Key",
"About": "About",
"Add": "Add",
@@ -9,6 +10,7 @@
"Addresses": "Addresses",
"All Data": "All Data",
"Allow Anonymous Usage Reporting?": "Allow Anonymous Usage Reporting?",
"Alphabetic": "Alphabetic",
"An external command handles the versioning. It has to remove the file from the synced folder.": "An external command handles the versioning. It has to remove the file from the synced folder.",
"Anonymous Usage Reporting": "Anonymous Usage Reporting",
"Any devices configured on an introducer device will be added to this device as well.": "Any devices configured on an introducer device will be added to this device as well.",
@@ -44,6 +46,7 @@
"Enter ignore patterns, one per line.": "Enter ignore patterns, one per line.",
"Error": "Error",
"External File Versioning": "External File Versioning",
"File Pull Order": "File Pull Order",
"File Versioning": "File Versioning",
"File permission bits are ignored when looking for changes. Use on FAT file systems.": "File permission bits are ignored when looking for changes. Use on FAT file systems.",
"Files are moved to date stamped versions in a .stversions folder when replaced or deleted by Syncthing.": "Files are moved to date stamped versions in a .stversions folder when replaced or deleted by Syncthing.",
@@ -66,11 +69,13 @@
"Introducer": "Introducer",
"Inversion of the given condition (i.e. do not exclude)": "Inversion of the given condition (i.e. do not exclude)",
"Keep Versions": "Keep Versions",
"Largest First": "Largest First",
"Last File Received": "Last File Received",
"Last seen": "Last seen",
"Later": "Later",
"Local Discovery": "Local Discovery",
"Local State": "Local State",
"Major Upgrade": "Major Upgrade",
"Maximum Age": "Maximum Age",
"Metadata Only": "Metadata Only",
"Move to top of queue": "Move to top of queue",
@@ -78,22 +83,27 @@
"Never": "Never",
"New Device": "New Device",
"New Folder": "New Folder",
"Newest First": "Newest First",
"No": "No",
"No File Versioning": "No File Versioning",
"Notice": "Notice",
"OK": "OK",
"Off": "Off",
"Oldest First": "Oldest First",
"Out Of Sync": "Out Of Sync",
"Out of Sync Items": "Out of Sync Items",
"Outgoing Rate Limit (KiB/s)": "Outgoing Rate Limit (KiB/s)",
"Override Changes": "Override Changes",
"Path to the folder on the local computer. Will be created if it does not exist. The tilde character (~) can be used as a shortcut for": "Path to the folder on the local computer. Will be created if it does not exist. The tilde character (~) can be used as a shortcut for",
"Path where versions should be stored (leave empty for the default .stversions folder in the folder).": "Path where versions should be stored (leave empty for the default .stversions folder in the folder).",
"Please consult the release notes before performing a major upgrade.": "Please consult the release notes before performing a major upgrade.",
"Please wait": "Please wait",
"Preview": "Preview",
"Preview Usage Report": "Preview Usage Report",
"Quick guide to supported patterns": "Quick guide to supported patterns",
"RAM Utilization": "RAM Utilization",
"Random": "Random",
"Release Notes": "Release Notes",
"Rescan": "Rescan",
"Rescan All": "Rescan All",
"Rescan Interval": "Rescan Interval",
@@ -120,6 +130,7 @@
"Shutdown Complete": "Shutdown Complete",
"Simple File Versioning": "Simple File Versioning",
"Single level wildcard (matches within a directory only)": "Single level wildcard (matches within a directory only)",
"Smallest First": "Smallest First",
"Source Code": "Source Code",
"Staggered File Versioning": "Staggered File Versioning",
"Start Browser": "Start Browser",
@@ -151,10 +162,12 @@
"The number of versions must be a number and cannot be blank.": "The number of versions must be a number and cannot be blank.",
"The path cannot be blank.": "The path cannot be blank.",
"The rescan interval must be a non-negative number of seconds.": "The rescan interval must be a non-negative number of seconds.",
"This is a major version upgrade.": "This is a major version upgrade.",
"Unknown": "Unknown",
"Unshared": "Unshared",
"Unused": "Unused",
"Up to Date": "Up to Date",
"Upgrade": "Upgrade",
"Upgrade To {%version%}": "Upgrade To {{version}}",
"Upgrading": "Upgrading",
"Upload Rate": "Upload Rate",

View File

@@ -38,6 +38,12 @@
<span translate translate-value-version="{{upgradeInfo.latest}}">Upgrade To {%version%}</span>
</button>
</li>
<li ng-if="upgradeInfo && upgradeInfo.majorNewer">
<button type="button" class="btn navbar-btn btn-danger btn-sm" href="" ng-click="upgradeMajor()">
<span class="glyphicon glyphicon-chevron-up"></span>&emsp;
<span translate translate-value-version="{{upgradeInfo.latest}}">Upgrade To {%version%}</span>
</button>
</li>
<li class="dropdown" language-select></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown"><span class="glyphicon glyphicon-cog" aria-label="Edit"></span></a>
@@ -233,6 +239,17 @@
<th><span class="glyphicon glyphicon-refresh"></span>&emsp;<span translate>Rescan Interval</span></th>
<td class="text-right">{{folder.rescanIntervalS}} s</td>
</tr>
<tr ng-if="folder.order != 'random'">
<th><span class="glyphicon glyphicon-sort"></span>&emsp;<span translate>File Pull Order</span></th>
<td class="text-right" ng-switch="folder.order">
<span ng-switch-when="random" translate>Random</span>
<span ng-switch-when="alphabetic" translate>Alphabetic</span>
<span ng-switch-when="smallestFirst" translate>Smallest First</span>
<span ng-switch-when="largestFirst" translate>Largest First</span>
<span ng-switch-when="oldestFirst" translate>Oldest First</span>
<span ng-switch-when="newestFirst" translate>Newest First</span>
</td>
</tr>
<tr ng-if="folder.versioning.type">
<th><span class="glyphicon glyphicon-tags"></span>&emsp;<span translate>File Versioning</span></th>
<td class="text-right" ng-switch="folder.versioning.type">
@@ -466,6 +483,35 @@
<img ng-if="myID" class="center-block img-thumbnail" ng-src="qr/?text={{myID}}"/>
</modal>
<!-- Major upgrade modal -->
<div id="majorUpgrade" class="modal fade" tabindex="-1" data-backdrop="true" data-keyboard="true">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header alert alert-danger">
<h4 class="modal-title">
<span ng-if="icon" class="glyphicon glyphicon-chevron-up"></span>
<span translate>Major Upgrade</span>
</h4>
</div>
<div class="modal-body">
<p>
<span translate>This is a major version upgrade.</span>
<span translate>A new major version may not be compatible with previous versions.</span>
<span translate>Please consult the release notes before performing a major upgrade.</span>
</p>
<p>
<a href="https://github.com/syncthing/syncthing/releases/latest" target="_blank" translate>Release Notes</a>
</p>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-primary btn-sm" ng-click="upgrade()"><span class="glyphicon glyphicon-ok"></span>&emsp;<span translate>Upgrade</span></button>
<button type="button" class="btn btn-default btn-sm" data-dismiss="modal"><span class="glyphicon glyphicon-remove"></span>&emsp;<span translate>Close</span></button>
</div>
</div>
</div>
</div>
<!-- Device editor modal -->
<div id="editDevice" class="modal fade" tabindex="-1">
@@ -590,6 +636,7 @@
</div>
</div>
<div class="row">
<!-- Left column -->
<div class="col-md-6">
<div class="form-group">
<div class="checkbox">
@@ -608,7 +655,20 @@
<p translate class="help-block">File permission bits are ignored when looking for changes. Use on FAT file systems.</p>
</div>
</div>
<!-- Right column-->
<div class="col-md-6">
<div class="form-group">
<label translate>File Pull Order</label>
<select class="form-control" ng-model="currentFolder.order">
<option value="random" translate>Random</option>
<option value="alphabetic" translate>Alphabetic</option>
<option value="smallestFirst" translate>Smallest First</option>
<option value="largestFirst" translate>Largest First</option>
<option value="oldestFirst" translate>Oldest First</option>
<option value="newestFirst" translate>Newest First</option>
</select>
</div>
<div class="form-group">
<label translate>File Versioning</label>
<div class="radio">
@@ -907,10 +967,22 @@
<hr/>
<table class="table table-striped table-condensed">
<tr ng-repeat="f in needed.progress" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td title="{{f.name}}">{{f.name | basename}}</td>
<td ng-if="a == 'sync' && progress[neededFolder] && progress[neededFolder][f.name]">
<tr dir-paginate="f in needed | itemsPerPage: neededPageSize" current-page="neededCurrentPage" total-items="neededTotal">
<!-- Icon -->
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[f.action]}}"></span> {{needActions[f.action]}}</td>
<!-- Name -->
<td ng-if="f.type != 'queued'" title="{{f.name}}">{{f.name | basename}}</td>
<td ng-if="f.type == 'queued'">
<a href="" ng-click="bumpFile(neededFolder, f.name)" title="{{'Move to top of queue' | translate}}">
<span class="glyphicon glyphicon-eject"></span>
</a>
<span title="{{f.name}}">&ensp;{{f.name | basename}}</span>
</td>
<!-- Size/Progress -->
<td ng-if="f.type == 'progress' && f.action == 'sync' && progress[neededFolder] && progress[neededFolder][f.name]">
<div class="progress">
<div class="progress-bar progress-bar-success" style="width: {{progress[neededFolder][f.name].reused}}%"></div>
<div class="progress-bar" style="width: {{progress[neededFolder][f.name].copiedFromOrigin}}%"></div>
@@ -922,24 +994,20 @@
</span>
</div>
</td>
<td class="text-right small-data" ng-if="a != 'sync' || !progress[neededFolder] || !progress[neededFolder][f.name]">
<td class="text-right small-data" ng-if="f.type != 'progress' || f.action != 'sync' || !progress[neededFolder] || !progress[neededFolder][f.name]">
<span ng-if="f.size > 0">{{f.size | binary}}B</span>
</td>
</tr>
<tr ng-repeat="f in needed.queued" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td><a href="" ng-if="$index != 0" ng-click="bumpFile(neededFolder, f.name)" title="{{'Move to top of queue' | translate}}"><span class="glyphicon glyphicon-eject"></span></a><span ng-if="$index != 0">&ensp;</span><span title="{{f.name}}">{{f.name | basename}}</span></td>
<td class="text-right small-data">
<span ng-if="f.size > 0">{{f.size | binary}}B</span>
</td>
</tr>
<tr ng-repeat="f in needed.rest" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td title="{{f.name}}">{{f.name | basename}}</td>
<td class="text-right small-data"><span ng-if="f.size > 0">{{f.size | binary}}B</span></td>
</tr>
</table>
<dir-pagination-controls on-page-change="neededPageChanged(newPageNumber)"></dir-pagination-controls>
<ul class="pagination pull-right">
<li ng-repeat="option in [10, 20, 30, 50, 100]" ng-class="{ active: neededPageSize == option }">
<a href="#" ng-click="neededChangePageSize(option)">{{option}}</a>
<li>
</ul>
<div class="clearfix">
</modal>
<!-- About modal -->
@@ -970,6 +1038,7 @@
<li class="auto-generated">Daniel Martí</li>
<li class="auto-generated">Dennis Wilson</li>
<li class="auto-generated">Dominik Heidler</li>
<li class="auto-generated">Elias Jarlebring</li>
<li class="auto-generated">Emil Hessman</li>
<li class="auto-generated">Federico Castagnini</li>
<li class="auto-generated">Felix Ableitner</li>
@@ -1027,6 +1096,7 @@
<script src="vendor/angular/angular.min.js"></script>
<script src="vendor/angular/angular-translate.min.js"></script>
<script src="vendor/angular/angular-translate-loader.min.js"></script>
<script src="vendor/angular/angular-dirPagination.js"></script>
<script src="vendor/jquery/jquery-2.0.3.min.js"></script>
<script src="vendor/bootstrap/js/bootstrap.min.js"></script>
<!-- / vendor scripts -->

View File

@@ -9,6 +9,7 @@
/*global $: false, angular: false, console: false, validLangs: false */
var syncthing = angular.module('syncthing', [
'angularUtils.directives.dirPagination',
'pascalprecht.translate',
'syncthing.core'

View File

@@ -40,6 +40,10 @@ angular.module('syncthing.core')
$scope.folderStats = {};
$scope.progress = {};
$scope.version = {};
$scope.needed = [];
$scope.neededTotal = 0;
$scope.neededCurrentPage = 1;
$scope.neededPageSize = 10;
$(window).bind('beforeunload', function () {
navigatingAway = true;
@@ -415,14 +419,63 @@ angular.module('syncthing.core')
}
function refreshNeed(folder) {
$http.get(urlbase + "/db/need?folder=" + encodeURIComponent(folder)).success(function (data) {
var url = urlbase + "/db/need?folder=" + encodeURIComponent(folder);
url += "&page=" + $scope.neededCurrentPage;
url += "&perpage=" + $scope.neededPageSize;
$http.get(url).success(function (data) {
if ($scope.neededFolder == folder) {
console.log("refreshNeed", folder, data);
$scope.needed = data;
parseNeeded(data);
}
}).error($scope.emitHTTPError);
}
function needAction(file) {
var fDelete = 4096;
var fDirectory = 16384;
if ((file.flags & (fDelete + fDirectory)) === fDelete + fDirectory) {
return 'rmdir';
} else if ((file.flags & fDelete) === fDelete) {
return 'rm';
} else if ((file.flags & fDirectory) === fDirectory) {
return 'touch';
} else {
return 'sync';
}
};
function parseNeeded(data) {
var merged = [];
data.progress.forEach(function (item) {
item.type = "progress";
item.action = needAction(item);
merged.push(item);
});
data.queued.forEach(function (item) {
item.type = "queued";
item.action = needAction(item);
merged.push(item);
});
data.rest.forEach(function (item) {
item.type = "rest";
item.action = needAction(item);
merged.push(item);
});
$scope.needed = merged;
$scope.neededTotal = data.total;
}
$scope.neededPageChanged = function (page) {
$scope.neededCurrentPage = page;
refreshNeed($scope.neededFolder);
};
$scope.neededChangePageSize = function (perpage) {
$scope.neededPageSize = perpage;
refreshNeed($scope.neededFolder);
}
var refreshDeviceStats = debounce(function () {
$http.get(urlbase + "/stats/device").success(function (data) {
$scope.deviceStats = data;
@@ -698,6 +751,7 @@ angular.module('syncthing.core')
$scope.upgrade = function () {
restarting = true;
$('#majorUpgrade').modal('hide');
$('#upgrading').modal();
$http.post(urlbase + '/system/upgrade').success(function () {
$('#restarting').modal();
@@ -707,6 +761,10 @@ angular.module('syncthing.core')
});
};
$scope.upgradeMajor = function () {
$('#majorUpgrade').modal();
};
$scope.shutdown = function () {
restarting = true;
$http.post(urlbase + '/system/shutdown').success(function () {
@@ -1176,24 +1234,11 @@ angular.module('syncthing.core')
$('#needed').modal().on('hidden.bs.modal', function () {
$scope.neededFolder = undefined;
$scope.needed = undefined;
$scope.neededTotal = 0;
$scope.neededCurrentPage = 1;
});
};
$scope.needAction = function (file) {
var fDelete = 4096;
var fDirectory = 16384;
if ((file.flags & (fDelete + fDirectory)) === fDelete + fDirectory) {
return 'rmdir';
} else if ((file.flags & fDelete) === fDelete) {
return 'rm';
} else if ((file.flags & fDirectory) === fDirectory) {
return 'touch';
} else {
return 'sync';
}
};
$scope.override = function (folder) {
$http.post(urlbase + "/db/override?folder=" + encodeURIComponent(folder));
};
@@ -1215,10 +1260,14 @@ angular.module('syncthing.core')
};
$scope.bumpFile = function (folder, file) {
$http.post(urlbase + "/db/prio?folder=" + encodeURIComponent(folder) + "&file=" + encodeURIComponent(file)).success(function (data) {
var url = urlbase + "/db/prio?folder=" + encodeURIComponent(folder) + "&file=" + encodeURIComponent(file);
// In order to get the right view of data in the response.
url += "&page=" + $scope.neededCurrentPage;
url += "&perpage=" + $scope.neededPageSize;
$http.post(url).success(function (data) {
if ($scope.neededFolder == folder) {
console.log("bumpFile", folder, data);
$scope.needed = data;
parseNeeded(data);
}
}).error($scope.emitHTTPError);
};

View File

@@ -0,0 +1,520 @@
/**
* dirPagination - AngularJS module for paginating (almost) anything.
*
*
* Credits
* =======
*
* Daniel Tabuenca: https://groups.google.com/d/msg/angular/an9QpzqIYiM/r8v-3W1X5vcJ
* for the idea on how to dynamically invoke the ng-repeat directive.
*
* I borrowed a couple of lines and a few attribute names from the AngularUI Bootstrap project:
* https://github.com/angular-ui/bootstrap/blob/master/src/pagination/pagination.js
*
* Copyright 2014 Michael Bromley <michael@michaelbromley.co.uk>
*/
(function() {
/**
* Config
*/
var moduleName = 'angularUtils.directives.dirPagination';
var DEFAULT_ID = '__default';
/**
* Module
*/
var module;
try {
module = angular.module(moduleName);
} catch(err) {
// named module does not exist, so create one
module = angular.module(moduleName, []);
}
module
.directive('dirPaginate', ['$compile', '$parse', 'paginationService', dirPaginateDirective])
.directive('dirPaginateNoCompile', noCompileDirective)
.directive('dirPaginationControls', ['paginationService', 'paginationTemplate', dirPaginationControlsDirective])
.filter('itemsPerPage', ['paginationService', itemsPerPageFilter])
.service('paginationService', paginationService)
.provider('paginationTemplate', paginationTemplateProvider)
.run(['$templateCache',dirPaginationControlsTemplateInstaller]);
function dirPaginateDirective($compile, $parse, paginationService) {
return {
terminal: true,
multiElement: true,
compile: dirPaginationCompileFn
};
function dirPaginationCompileFn(tElement, tAttrs){
var expression = tAttrs.dirPaginate;
// regex taken directly from https://github.com/angular/angular.js/blob/master/src/ng/directive/ngRepeat.js#L211
var match = expression.match(/^\s*([\s\S]+?)\s+in\s+([\s\S]+?)(?:\s+track\s+by\s+([\s\S]+?))?\s*$/);
var filterPattern = /\|\s*itemsPerPage\s*:[^|]*/;
if (match[2].match(filterPattern) === null) {
throw 'pagination directive: the \'itemsPerPage\' filter must be set.';
}
var itemsPerPageFilterRemoved = match[2].replace(filterPattern, '');
var collectionGetter = $parse(itemsPerPageFilterRemoved);
addNoCompileAttributes(tElement);
// If any value is specified for paginationId, we register the un-evaluated expression at this stage for the benefit of any
// dir-pagination-controls directives that may be looking for this ID.
var rawId = tAttrs.paginationId || DEFAULT_ID;
paginationService.registerInstance(rawId);
return function dirPaginationLinkFn(scope, element, attrs){
// Now that we have access to the `scope` we can interpolate any expression given in the paginationId attribute and
// potentially register a new ID if it evaluates to a different value than the rawId.
var paginationId = $parse(attrs.paginationId)(scope) || attrs.paginationId || DEFAULT_ID;
paginationService.registerInstance(paginationId);
var repeatExpression = getRepeatExpression(expression, paginationId);
addNgRepeatToElement(element, attrs, repeatExpression);
removeTemporaryAttributes(element);
var compiled = $compile(element);
var currentPageGetter = makeCurrentPageGetterFn(scope, attrs, paginationId);
paginationService.setCurrentPageParser(paginationId, currentPageGetter, scope);
if (typeof attrs.totalItems !== 'undefined') {
paginationService.setAsyncModeTrue(paginationId);
scope.$watch(function() {
return $parse(attrs.totalItems)(scope);
}, function (result) {
if (0 <= result) {
paginationService.setCollectionLength(paginationId, result);
}
});
} else {
scope.$watchCollection(function() {
return collectionGetter(scope);
}, function(collection) {
if (collection) {
paginationService.setCollectionLength(paginationId, collection.length);
}
});
}
// Delegate to the link function returned by the new compilation of the ng-repeat
compiled(scope);
};
}
/**
* If a pagination id has been specified, we need to check that it is present as the second argument passed to
* the itemsPerPage filter. If it is not there, we add it and return the modified expression.
*
* @param expression
* @param paginationId
* @returns {*}
*/
function getRepeatExpression(expression, paginationId) {
var repeatExpression,
idDefinedInFilter = !!expression.match(/(\|\s*itemsPerPage\s*:[^|]*:[^|]*)/);
if (paginationId !== DEFAULT_ID && !idDefinedInFilter) {
repeatExpression = expression.replace(/(\|\s*itemsPerPage\s*:[^|]*)/, "$1 : '" + paginationId + "'");
} else {
repeatExpression = expression;
}
return repeatExpression;
}
/**
* Adds the ng-repeat directive to the element. In the case of multi-element (-start, -end) it adds the
* appropriate multi-element ng-repeat to the first and last element in the range.
* @param element
* @param attrs
* @param repeatExpression
*/
function addNgRepeatToElement(element, attrs, repeatExpression) {
if (element[0].hasAttribute('dir-paginate-start') || element[0].hasAttribute('data-dir-paginate-start')) {
// using multiElement mode (dir-paginate-start, dir-paginate-end)
attrs.$set('ngRepeatStart', repeatExpression);
element.eq(element.length - 1).attr('ng-repeat-end', true);
} else {
attrs.$set('ngRepeat', repeatExpression);
}
}
/**
* Adds the dir-paginate-no-compile directive to each element in the tElement range.
* @param tElement
*/
function addNoCompileAttributes(tElement) {
angular.forEach(tElement, function(el) {
if (el.nodeType === Node.ELEMENT_NODE) {
angular.element(el).attr('dir-paginate-no-compile', true);
}
});
}
/**
* Removes the variations on dir-paginate (data-, -start, -end) and the dir-paginate-no-compile directives.
* @param element
*/
function removeTemporaryAttributes(element) {
angular.forEach(element, function(el) {
if (el.nodeType === Node.ELEMENT_NODE) {
angular.element(el).removeAttr('dir-paginate-no-compile');
}
});
element.eq(0).removeAttr('dir-paginate-start').removeAttr('dir-paginate').removeAttr('data-dir-paginate-start').removeAttr('data-dir-paginate');
element.eq(element.length - 1).removeAttr('dir-paginate-end').removeAttr('data-dir-paginate-end');
}
/**
* Creates a getter function for the current-page attribute, using the expression provided or a default value if
* no current-page expression was specified.
*
* @param scope
* @param attrs
* @param paginationId
* @returns {*}
*/
function makeCurrentPageGetterFn(scope, attrs, paginationId) {
var currentPageGetter;
if (attrs.currentPage) {
currentPageGetter = $parse(attrs.currentPage);
} else {
// if the current-page attribute was not set, we'll make our own
var defaultCurrentPage = paginationId + '__currentPage';
scope[defaultCurrentPage] = 1;
currentPageGetter = $parse(defaultCurrentPage);
}
return currentPageGetter;
}
}
/**
* This is a helper directive that allows correct compilation when in multi-element mode (ie dir-paginate-start, dir-paginate-end).
* It is dynamically added to all elements in the dir-paginate compile function, and it prevents further compilation of
* any inner directives. It is then removed in the link function, and all inner directives are then manually compiled.
*/
function noCompileDirective() {
return {
priority: 5000,
terminal: true
};
}
function dirPaginationControlsTemplateInstaller($templateCache) {
$templateCache.put('angularUtils.directives.dirPagination.template', '<ul class="pagination" ng-if="1 < pages.length"><li ng-if="boundaryLinks" ng-class="{ disabled : pagination.current == 1 }"><a href="" ng-click="setCurrent(1)">&laquo;</a></li><li ng-if="directionLinks" ng-class="{ disabled : pagination.current == 1 }"><a href="" ng-click="setCurrent(pagination.current - 1)">&lsaquo;</a></li><li ng-repeat="pageNumber in pages track by $index" ng-class="{ active : pagination.current == pageNumber, disabled : pageNumber == \'...\' }"><a href="" ng-click="setCurrent(pageNumber)">{{ pageNumber }}</a></li><li ng-if="directionLinks" ng-class="{ disabled : pagination.current == pagination.last }"><a href="" ng-click="setCurrent(pagination.current + 1)">&rsaquo;</a></li><li ng-if="boundaryLinks" ng-class="{ disabled : pagination.current == pagination.last }"><a href="" ng-click="setCurrent(pagination.last)">&raquo;</a></li></ul>');
}
function dirPaginationControlsDirective(paginationService, paginationTemplate) {
var numberRegex = /^\d+$/;
return {
restrict: 'AE',
templateUrl: function(elem, attrs) {
return attrs.templateUrl || paginationTemplate.getPath();
},
scope: {
maxSize: '=?',
onPageChange: '&?',
paginationId: '=?'
},
link: dirPaginationControlsLinkFn
};
function dirPaginationControlsLinkFn(scope, element, attrs) {
// rawId is the un-interpolated value of the pagination-id attribute. This is only important when the corresponding dir-paginate directive has
// not yet been linked (e.g. if it is inside an ng-if block), and in that case it prevents this controls directive from assuming that there is
// no corresponding dir-paginate directive and wrongly throwing an exception.
var rawId = attrs.paginationId || DEFAULT_ID;
var paginationId = scope.paginationId || attrs.paginationId || DEFAULT_ID;
if (!paginationService.isRegistered(paginationId) && !paginationService.isRegistered(rawId)) {
var idMessage = (paginationId !== DEFAULT_ID) ? ' (id: ' + paginationId + ') ' : ' ';
throw 'pagination directive: the pagination controls' + idMessage + 'cannot be used without the corresponding pagination directive.';
}
if (!scope.maxSize) { scope.maxSize = 9; }
scope.directionLinks = angular.isDefined(attrs.directionLinks) ? scope.$parent.$eval(attrs.directionLinks) : true;
scope.boundaryLinks = angular.isDefined(attrs.boundaryLinks) ? scope.$parent.$eval(attrs.boundaryLinks) : false;
var paginationRange = Math.max(scope.maxSize, 5);
scope.pages = [];
scope.pagination = {
last: 1,
current: 1
};
scope.range = {
lower: 1,
upper: 1,
total: 1
};
scope.$watch(function() {
return (paginationService.getCollectionLength(paginationId) + 1) * paginationService.getItemsPerPage(paginationId);
}, function(length) {
if (0 < length) {
generatePagination();
}
});
scope.$watch(function() {
return (paginationService.getItemsPerPage(paginationId));
}, function(current, previous) {
if (current != previous && typeof previous !== 'undefined') {
goToPage(scope.pagination.current);
}
});
scope.$watch(function() {
return paginationService.getCurrentPage(paginationId);
}, function(currentPage, previousPage) {
if (currentPage != previousPage) {
goToPage(currentPage);
}
});
scope.setCurrent = function(num) {
if (isValidPageNumber(num)) {
num = parseInt(num, 10);
paginationService.setCurrentPage(paginationId, num);
}
};
function goToPage(num) {
if (isValidPageNumber(num)) {
scope.pages = generatePagesArray(num, paginationService.getCollectionLength(paginationId), paginationService.getItemsPerPage(paginationId), paginationRange);
scope.pagination.current = num;
updateRangeValues();
// if a callback has been set, then call it with the page number as an argument
if (scope.onPageChange) {
scope.onPageChange({ newPageNumber : num });
}
}
}
function generatePagination() {
var page = parseInt(paginationService.getCurrentPage(paginationId)) || 1;
scope.pages = generatePagesArray(page, paginationService.getCollectionLength(paginationId), paginationService.getItemsPerPage(paginationId), paginationRange);
scope.pagination.current = page;
scope.pagination.last = scope.pages[scope.pages.length - 1];
if (scope.pagination.last < scope.pagination.current) {
scope.setCurrent(scope.pagination.last);
} else {
updateRangeValues();
}
}
/**
* This function updates the values (lower, upper, total) of the `scope.range` object, which can be used in the pagination
* template to display the current page range, e.g. "showing 21 - 40 of 144 results";
*/
function updateRangeValues() {
var currentPage = paginationService.getCurrentPage(paginationId),
itemsPerPage = paginationService.getItemsPerPage(paginationId),
totalItems = paginationService.getCollectionLength(paginationId);
scope.range.lower = (currentPage - 1) * itemsPerPage + 1;
scope.range.upper = Math.min(currentPage * itemsPerPage, totalItems);
scope.range.total = totalItems;
}
function isValidPageNumber(num) {
return (numberRegex.test(num) && (0 < num && num <= scope.pagination.last));
}
}
/**
* Generate an array of page numbers (or the '...' string) which is used in an ng-repeat to generate the
* links used in pagination
*
* @param currentPage
* @param rowsPerPage
* @param paginationRange
* @param collectionLength
* @returns {Array}
*/
function generatePagesArray(currentPage, collectionLength, rowsPerPage, paginationRange) {
var pages = [];
var totalPages = Math.ceil(collectionLength / rowsPerPage);
var halfWay = Math.ceil(paginationRange / 2);
var position;
if (currentPage <= halfWay) {
position = 'start';
} else if (totalPages - halfWay < currentPage) {
position = 'end';
} else {
position = 'middle';
}
var ellipsesNeeded = paginationRange < totalPages;
var i = 1;
while (i <= totalPages && i <= paginationRange) {
var pageNumber = calculatePageNumber(i, currentPage, paginationRange, totalPages);
var openingEllipsesNeeded = (i === 2 && (position === 'middle' || position === 'end'));
var closingEllipsesNeeded = (i === paginationRange - 1 && (position === 'middle' || position === 'start'));
if (ellipsesNeeded && (openingEllipsesNeeded || closingEllipsesNeeded)) {
pages.push('...');
} else {
pages.push(pageNumber);
}
i ++;
}
return pages;
}
/**
* Given the position in the sequence of pagination links [i], figure out what page number corresponds to that position.
*
* @param i
* @param currentPage
* @param paginationRange
* @param totalPages
* @returns {*}
*/
function calculatePageNumber(i, currentPage, paginationRange, totalPages) {
var halfWay = Math.ceil(paginationRange/2);
if (i === paginationRange) {
return totalPages;
} else if (i === 1) {
return i;
} else if (paginationRange < totalPages) {
if (totalPages - halfWay < currentPage) {
return totalPages - paginationRange + i;
} else if (halfWay < currentPage) {
return currentPage - halfWay + i;
} else {
return i;
}
} else {
return i;
}
}
}
/**
* This filter slices the collection into pages based on the current page number and number of items per page.
* @param paginationService
* @returns {Function}
*/
function itemsPerPageFilter(paginationService) {
return function(collection, itemsPerPage, paginationId) {
if (typeof (paginationId) === 'undefined') {
paginationId = DEFAULT_ID;
}
if (!paginationService.isRegistered(paginationId)) {
throw 'pagination directive: the itemsPerPage id argument (id: ' + paginationId + ') does not match a registered pagination-id.';
}
var end;
var start;
if (collection instanceof Array) {
itemsPerPage = parseInt(itemsPerPage) || 9999999999;
if (paginationService.isAsyncMode(paginationId)) {
start = 0;
} else {
start = (paginationService.getCurrentPage(paginationId) - 1) * itemsPerPage;
}
end = start + itemsPerPage;
paginationService.setItemsPerPage(paginationId, itemsPerPage);
return collection.slice(start, end);
} else {
return collection;
}
};
}
/**
* This service allows the various parts of the module to communicate and stay in sync.
*/
function paginationService() {
var instances = {};
var lastRegisteredInstance;
this.registerInstance = function(instanceId) {
if (typeof instances[instanceId] === 'undefined') {
instances[instanceId] = {
asyncMode: false
};
lastRegisteredInstance = instanceId;
}
};
this.isRegistered = function(instanceId) {
return (typeof instances[instanceId] !== 'undefined');
};
this.getLastInstanceId = function() {
return lastRegisteredInstance;
};
this.setCurrentPageParser = function(instanceId, val, scope) {
instances[instanceId].currentPageParser = val;
instances[instanceId].context = scope;
};
this.setCurrentPage = function(instanceId, val) {
instances[instanceId].currentPageParser.assign(instances[instanceId].context, val);
};
this.getCurrentPage = function(instanceId) {
var parser = instances[instanceId].currentPageParser;
return parser ? parser(instances[instanceId].context) : 1;
};
this.setItemsPerPage = function(instanceId, val) {
instances[instanceId].itemsPerPage = val;
};
this.getItemsPerPage = function(instanceId) {
return instances[instanceId].itemsPerPage;
};
this.setCollectionLength = function(instanceId, val) {
instances[instanceId].collectionLength = val;
};
this.getCollectionLength = function(instanceId) {
return instances[instanceId].collectionLength;
};
this.setAsyncModeTrue = function(instanceId) {
instances[instanceId].asyncMode = true;
};
this.isAsyncMode = function(instanceId) {
return instances[instanceId].asyncMode;
};
}
/**
* This provider allows global configuration of the template path used by the dir-pagination-controls directive.
*/
function paginationTemplateProvider() {
var templatePath = 'angularUtils.directives.dirPagination.template';
this.setPath = function(path) {
templatePath = path;
};
this.$get = function() {
return {
getPath: function() {
return templatePath;
}
};
};
}
})();

View File

File diff suppressed because one or more lines are too long

View File

@@ -82,6 +82,7 @@ type FolderConfiguration struct {
Copiers int `xml:"copiers" json:"copiers"` // This defines how many files are handled concurrently.
Pullers int `xml:"pullers" json:"pullers"` // Defines how many blocks are fetched at the same time, possibly between separate copier routines.
Hashers int `xml:"hashers" json:"hashers"` // Less than one sets the value to the number of cores. These are CPU bound due to hashing.
Order PullOrder `xml:"order" json:"order"`
Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved
@@ -678,3 +679,57 @@ func randomString(l int) string {
}
return string(bs)
}
type PullOrder int
const (
OrderRandom PullOrder = iota // default is random
OrderAlphabetic
OrderSmallestFirst
OrderLargestFirst
OrderOldestFirst
OrderNewestFirst
)
func (o PullOrder) String() string {
switch o {
case OrderRandom:
return "random"
case OrderAlphabetic:
return "alphabetic"
case OrderSmallestFirst:
return "smallestFirst"
case OrderLargestFirst:
return "largestFirst"
case OrderOldestFirst:
return "oldestFirst"
case OrderNewestFirst:
return "newestFirst"
default:
return "unknown"
}
}
func (o PullOrder) MarshalText() ([]byte, error) {
return []byte(o.String()), nil
}
func (o *PullOrder) UnmarshalText(bs []byte) error {
switch string(bs) {
case "random":
*o = OrderRandom
case "alphabetic":
*o = OrderAlphabetic
case "smallestFirst":
*o = OrderSmallestFirst
case "largestFirst":
*o = OrderLargestFirst
case "oldestFirst":
*o = OrderOldestFirst
case "newestFirst":
*o = OrderNewestFirst
default:
*o = OrderRandom
}
return nil
}

View File

@@ -528,3 +528,51 @@ func TestCopy(t *testing.T) {
t.Error("Copy should be unchanged")
}
}
func TestPullOrder(t *testing.T) {
wrapper, err := Load("testdata/pullorder.xml", device1)
if err != nil {
t.Fatal(err)
}
folders := wrapper.Folders()
expected := []struct {
name string
order PullOrder
}{
{"f1", OrderRandom}, // empty value, default
{"f2", OrderRandom}, // explicit
{"f3", OrderAlphabetic}, // explicit
{"f4", OrderRandom}, // unknown value, default
{"f5", OrderSmallestFirst}, // explicit
{"f6", OrderLargestFirst}, // explicit
{"f7", OrderOldestFirst}, // explicit
{"f8", OrderNewestFirst}, // explicit
}
// Verify values are deserialized correctly
for _, tc := range expected {
if actual := folders[tc.name].Order; actual != tc.order {
t.Errorf("Incorrect pull order for %q: %v != %v", tc.name, actual, tc.order)
}
}
// Serialize and deserialize again to verify it survives the transformation
buf := new(bytes.Buffer)
cfg := wrapper.Raw()
cfg.WriteXML(buf)
t.Logf("%s", buf.Bytes())
cfg, err = ReadXML(buf, device1)
wrapper = Wrap("testdata/pullorder.xml", cfg)
folders = wrapper.Folders()
for _, tc := range expected {
if actual := folders[tc.name].Order; actual != tc.order {
t.Errorf("Incorrect pull order for %q: %v != %v", tc.name, actual, tc.order)
}
}
}

25
internal/config/testdata/pullorder.xml vendored Normal file
View File

@@ -0,0 +1,25 @@
<configuration version="10">
<folder id="f1" directory="testdata/">
</folder>
<folder id="f2" directory="testdata/">
<order>random</order>
</folder>
<folder id="f3" directory="testdata/">
<order>alphabetic</order>
</folder>
<folder id="f4" directory="testdata/">
<order>whatever</order>
</folder>
<folder id="f5" directory="testdata/">
<order>smallestFirst</order>
</folder>
<folder id="f6" directory="testdata/">
<order>largestFirst</order>
</folder>
<folder id="f7" directory="testdata/">
<order>oldestFirst</order>
</folder>
<folder id="f8" directory="testdata/">
<order>newestFirst</order>
</folder>
</configuration>

View File

@@ -10,11 +10,11 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/osutil"
"github.com/syncthing/syncthing/internal/sync"
)
// An interface to handle configuration changes, and a wrapper type á la
@@ -49,7 +49,12 @@ type Wrapper struct {
// Wrap wraps an existing Configuration structure and ties it to a file on
// disk.
func Wrap(path string, cfg Configuration) *Wrapper {
w := &Wrapper{cfg: cfg, path: path}
w := &Wrapper{
cfg: cfg,
path: path,
mut: sync.NewMutex(),
sMut: sync.NewMutex(),
}
w.replaces = make(chan Configuration)
go w.Serve()
return w

View File

@@ -17,11 +17,11 @@ import (
"bytes"
"encoding/binary"
"sort"
"sync"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/osutil"
"github.com/syncthing/syncthing/internal/sync"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
@@ -123,7 +123,8 @@ func NewBlockFinder(db *leveldb.DB, cfg *config.Wrapper) *BlockFinder {
}
f := &BlockFinder{
db: db,
db: db,
mut: sync.NewRWMutex(),
}
f.Changed(cfg.Raw())
cfg.Subscribe(f)

View File

@@ -10,10 +10,11 @@ import (
"crypto/rand"
"log"
"os"
"sync"
"testing"
"time"
"github.com/syncthing/syncthing/internal/sync"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
@@ -132,7 +133,7 @@ func TestConcurrentSetClear(t *testing.T) {
dur := 30 * time.Second
t0 := time.Now()
var wg sync.WaitGroup
wg := sync.NewWaitGroup()
os.RemoveAll("testdata/concurrent-set-clear.db")
db, err := leveldb.OpenFile("testdata/concurrent-set-clear.db", &opt.Options{OpenFilesCacheCapacity: 10})
@@ -188,7 +189,7 @@ func TestConcurrentSetOnly(t *testing.T) {
dur := 30 * time.Second
t0 := time.Now()
var wg sync.WaitGroup
wg := sync.NewWaitGroup()
os.RemoveAll("testdata/concurrent-set-only.db")
db, err := leveldb.OpenFile("testdata/concurrent-set-only.db", &opt.Options{OpenFilesCacheCapacity: 10})

View File

@@ -14,9 +14,9 @@ import (
"fmt"
"runtime"
"sort"
"sync"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/sync"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -25,7 +25,7 @@ import (
var (
clockTick int64
clockMut sync.Mutex
clockMut sync.Mutex = sync.NewMutex()
)
func clock(v int64) int64 {

View File

@@ -13,10 +13,9 @@
package db
import (
"sync"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/osutil"
"github.com/syncthing/syncthing/internal/sync"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -50,6 +49,7 @@ func NewFileSet(folder string, db *leveldb.DB) *FileSet {
folder: folder,
db: db,
blockmap: NewBlockMap(db, folder),
mutex: sync.NewMutex(),
}
ldbCheckGlobals(db, []byte(folder))

View File

@@ -9,12 +9,13 @@ package discover
import (
"fmt"
"net"
"sync"
"time"
"testing"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/sync"
)
var device protocol.DeviceID
@@ -97,7 +98,7 @@ func TestUDP4Success(t *testing.T) {
// Do a lookup in a separate routine
addrs := []string{}
wg := sync.WaitGroup{}
wg := sync.NewWaitGroup()
wg.Add(1)
go func() {
addrs = client.Lookup(device)
@@ -193,7 +194,7 @@ func TestUDP4Failure(t *testing.T) {
// Do a lookup in a separate routine
addrs := []string{}
wg := sync.WaitGroup{}
wg := sync.NewWaitGroup()
wg.Add(1)
go func() {
addrs = client.Lookup(device)

View File

@@ -12,16 +12,19 @@ import (
"net"
"net/url"
"strconv"
"sync"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/sync"
)
func init() {
for _, proto := range []string{"udp", "udp4", "udp6"} {
Register(proto, func(uri *url.URL, pkt *Announce) (Client, error) {
c := &UDPClient{}
c := &UDPClient{
wg: sync.NewWaitGroup(),
mut: sync.NewRWMutex(),
}
err := c.Start(uri, pkt)
if err != nil {
return nil, err

View File

@@ -13,12 +13,12 @@ import (
"io"
"net"
"strconv"
"sync"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/beacon"
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/sync"
)
type Discoverer struct {
@@ -59,6 +59,8 @@ func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer {
negCacheCutoff: 3 * time.Minute,
registry: make(map[protocol.DeviceID][]CacheEntry),
lastLookup: make(map[protocol.DeviceID]time.Time),
registryLock: sync.NewRWMutex(),
mut: sync.NewRWMutex(),
}
}
@@ -140,7 +142,7 @@ func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
d.extPort = extPort
pkt := d.announcementPkt()
wg := sync.WaitGroup{}
wg := sync.NewWaitGroup()
clients := make(chan Client, len(servers))
for _, address := range servers {
wg.Add(1)
@@ -216,7 +218,7 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
// server client and one local announcement interval has passed. This is
// to avoid finding local peers on their remote address at startup.
results := make(chan []string, len(d.clients))
wg := sync.WaitGroup{}
wg := sync.NewWaitGroup()
for _, client := range d.clients {
wg.Add(1)
go func(c Client) {

View File

@@ -9,8 +9,10 @@ package events
import (
"errors"
"sync"
stdsync "sync"
"time"
"github.com/syncthing/syncthing/internal/sync"
)
type EventType int
@@ -101,7 +103,6 @@ type Subscription struct {
mask EventType
id int
events chan Event
mutex sync.Mutex
}
var Default = NewLogger()
@@ -113,7 +114,8 @@ var (
func NewLogger() *Logger {
return &Logger{
subs: make(map[int]*Subscription),
subs: make(map[int]*Subscription),
mutex: sync.NewMutex(),
}
}
@@ -168,9 +170,6 @@ func (l *Logger) Unsubscribe(s *Subscription) {
}
func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if debug {
dl.Debugln("poll", timeout)
}
@@ -197,15 +196,16 @@ type BufferedSubscription struct {
next int
cur int
mut sync.Mutex
cond *sync.Cond
cond *stdsync.Cond
}
func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription {
bs := &BufferedSubscription{
sub: s,
buf: make([]Event, size),
mut: sync.NewMutex(),
}
bs.cond = sync.NewCond(&bs.mut)
bs.cond = stdsync.NewCond(bs.mut)
go bs.pollingLoop()
return bs
}

View File

@@ -16,10 +16,10 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"github.com/syncthing/syncthing/internal/fnmatch"
"github.com/syncthing/syncthing/internal/sync"
)
type Pattern struct {
@@ -48,6 +48,7 @@ func New(withCache bool) *Matcher {
m := &Matcher{
withCache: withCache,
stop: make(chan struct{}),
mut: sync.NewMutex(),
}
if withCache {
go m.clean(2 * time.Hour)

View File

@@ -7,9 +7,8 @@
package model
import (
"sync"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/sync"
)
// deviceActivity tracks the number of outstanding requests per device and can
@@ -23,6 +22,7 @@ type deviceActivity struct {
func newDeviceActivity() *deviceActivity {
return &deviceActivity{
act: make(map[protocol.DeviceID]int),
mut: sync.NewMutex(),
}
}

View File

@@ -7,10 +7,10 @@
package model
import (
"sync"
"time"
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/sync"
)
type folderState int

View File

@@ -18,7 +18,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
stdsync "sync"
"time"
"github.com/syncthing/protocol"
@@ -30,6 +30,7 @@ import (
"github.com/syncthing/syncthing/internal/scanner"
"github.com/syncthing/syncthing/internal/stats"
"github.com/syncthing/syncthing/internal/symlinks"
"github.com/syncthing/syncthing/internal/sync"
"github.com/syncthing/syncthing/internal/versioner"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -85,7 +86,7 @@ type Model struct {
}
var (
SymlinkWarning = sync.Once{}
SymlinkWarning = stdsync.Once{}
)
// NewModel creates and starts a new model. The model starts in read-only mode,
@@ -113,6 +114,9 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName,
protoConn: make(map[protocol.DeviceID]protocol.Connection),
rawConn: make(map[protocol.DeviceID]io.Closer),
deviceVer: make(map[protocol.DeviceID]string),
fmut: sync.NewRWMutex(),
pmut: sync.NewRWMutex(),
}
if cfg.Options().ProgressUpdateIntervalS > -1 {
go m.progressEmitter.Serve()
@@ -125,8 +129,8 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName,
// the locks cannot be acquired in the given timeout period.
func (m *Model) StartDeadlockDetector(timeout time.Duration) {
l.Infof("Starting deadlock detector with %v timeout", timeout)
deadlockDetect(&m.fmut, timeout)
deadlockDetect(&m.pmut, timeout)
deadlockDetect(m.fmut, timeout)
deadlockDetect(m.pmut, timeout)
}
// StartRW starts read/write processing on the current model. When in
@@ -371,53 +375,71 @@ func (m *Model) NeedSize(folder string) (nfiles int, bytes int64) {
return
}
// NeedFiles returns the list of currently needed files in progress, queued,
// and to be queued on next puller iteration. Also takes a soft cap which is
// only respected when adding files from the model rather than the runner queue.
func (m *Model) NeedFolderFiles(folder string, max int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated) {
// NeedFiles returns paginated list of currently needed files in progress, queued,
// and to be queued on next puller iteration, as well as the total number of
// files currently needed.
func (m *Model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated, int) {
m.fmut.RLock()
defer m.fmut.RUnlock()
if rf, ok := m.folderFiles[folder]; ok {
var progress, queued, rest []db.FileInfoTruncated
var seen map[string]bool
total := 0
runner, ok := m.folderRunners[folder]
if ok {
progressNames, queuedNames := runner.Jobs()
progress = make([]db.FileInfoTruncated, len(progressNames))
queued = make([]db.FileInfoTruncated, len(queuedNames))
seen = make(map[string]bool, len(progressNames)+len(queuedNames))
for i, name := range progressNames {
if f, ok := rf.GetGlobalTruncated(name); ok {
progress[i] = f
seen[name] = true
}
}
for i, name := range queuedNames {
if f, ok := rf.GetGlobalTruncated(name); ok {
queued[i] = f
seen[name] = true
}
}
}
left := max - len(progress) - len(queued)
if max < 1 || left > 0 {
rf.WithNeedTruncated(protocol.LocalDeviceID, func(f db.FileIntf) bool {
left--
ft := f.(db.FileInfoTruncated)
if !seen[ft.Name] {
rest = append(rest, ft)
}
return max < 1 || left > 0
})
}
return progress, queued, rest
rf, ok := m.folderFiles[folder]
if !ok {
return nil, nil, nil, 0
}
return nil, nil, nil
var progress, queued, rest []db.FileInfoTruncated
var seen map[string]struct{}
skip := (page - 1) * perpage
get := perpage
runner, ok := m.folderRunners[folder]
if ok {
allProgressNames, allQueuedNames := runner.Jobs()
var progressNames, queuedNames []string
progressNames, skip, get = getChunk(allProgressNames, skip, get)
queuedNames, skip, get = getChunk(allQueuedNames, skip, get)
progress = make([]db.FileInfoTruncated, len(progressNames))
queued = make([]db.FileInfoTruncated, len(queuedNames))
seen = make(map[string]struct{}, len(progressNames)+len(queuedNames))
for i, name := range progressNames {
if f, ok := rf.GetGlobalTruncated(name); ok {
progress[i] = f
seen[name] = struct{}{}
}
}
for i, name := range queuedNames {
if f, ok := rf.GetGlobalTruncated(name); ok {
queued[i] = f
seen[name] = struct{}{}
}
}
}
rest = make([]db.FileInfoTruncated, 0, perpage)
rf.WithNeedTruncated(protocol.LocalDeviceID, func(f db.FileIntf) bool {
total++
if skip > 0 {
skip--
return true
}
if get > 0 {
ft := f.(db.FileInfoTruncated)
if _, ok := seen[ft.Name]; !ok {
rest = append(rest, ft)
get--
}
}
return true
})
return progress, queued, rest, total
}
// Index is called when a new device is connected and we receive their full index.
@@ -1099,9 +1121,9 @@ func (m *Model) ScanFolders() map[string]error {
m.fmut.RUnlock()
errors := make(map[string]error, len(m.folderCfgs))
var errorsMut sync.Mutex
errorsMut := sync.NewMutex()
var wg sync.WaitGroup
wg := sync.NewWaitGroup()
wg.Add(len(folders))
for _, folder := range folders {
folder := folder
@@ -1561,9 +1583,13 @@ func (m *Model) CheckFolderHealth(id string) error {
}
m.fmut.RLock()
runner := m.folderRunners[folder.ID]
runner, runnerExists := m.folderRunners[folder.ID]
m.fmut.RUnlock()
_, _, oldErr := runner.getState()
var oldErr error
if runnerExists {
_, _, oldErr = runner.getState()
}
if err != nil {
if oldErr != nil && oldErr.Error() != err.Error() {
@@ -1571,10 +1597,14 @@ func (m *Model) CheckFolderHealth(id string) error {
} else if oldErr == nil {
l.Warnf("Stopping folder %q - %v", folder.ID, err)
}
runner.setError(err)
if runnerExists {
runner.setError(err)
}
} else if oldErr != nil {
l.Infof("Folder %q error is cleared, restarting", folder.ID)
runner.setState(FolderIdle)
if runnerExists {
runner.setState(FolderIdle)
}
}
return err
@@ -1604,3 +1634,17 @@ func symlinkInvalid(isLink bool) bool {
}
return false
}
// Skips `skip` elements and retrieves up to `get` elements from a given slice.
// Returns the resulting slice, plus how much elements are left to skip or
// copy to satisfy the values which were provided, given the slice is not
// big enough.
func getChunk(data []string, skip, get int) ([]string, int, int) {
l := len(data)
if l <= skip {
return []string{}, skip - l, get
} else if l < skip+get {
return data[skip:l], 0, get - (l - skip)
}
return data[skip : skip+get], 0, 0
}

View File

@@ -9,11 +9,11 @@ package model
import (
"path/filepath"
"reflect"
"sync"
"time"
"github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/sync"
)
type ProgressEmitter struct {
@@ -35,6 +35,7 @@ func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter {
registry: make(map[string]*sharedPullerState),
last: make(map[string]map[string]*pullerProgress),
timer: time.NewTimer(time.Millisecond),
mut: sync.NewMutex(),
}
t.Changed(cfg.Raw())
cfg.Subscribe(t)

View File

@@ -12,6 +12,7 @@ import (
"github.com/syncthing/syncthing/internal/config"
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/sync"
)
var timeout = 10 * time.Millisecond
@@ -50,7 +51,9 @@ func TestProgressEmitter(t *testing.T) {
expectTimeout(w, t)
s := sharedPullerState{}
s := sharedPullerState{
mut: sync.NewMutex(),
}
p.Register(&s)
expectEvent(w, t, 1)

View File

@@ -6,21 +6,34 @@
package model
import "sync"
import (
"math/rand"
"sort"
"github.com/syncthing/syncthing/internal/sync"
)
type jobQueue struct {
progress []string
queued []string
queued []jobQueueEntry
mut sync.Mutex
}
func newJobQueue() *jobQueue {
return &jobQueue{}
type jobQueueEntry struct {
name string
size int64
modified int64
}
func (q *jobQueue) Push(file string) {
func newJobQueue() *jobQueue {
return &jobQueue{
mut: sync.NewMutex(),
}
}
func (q *jobQueue) Push(file string, size, modified int64) {
q.mut.Lock()
q.queued = append(q.queued, file)
q.queued = append(q.queued, jobQueueEntry{file, size, modified})
q.mut.Unlock()
}
@@ -32,8 +45,7 @@ func (q *jobQueue) Pop() (string, bool) {
return "", false
}
var f string
f = q.queued[0]
f := q.queued[0].name
q.queued = q.queued[1:]
q.progress = append(q.progress, f)
@@ -45,7 +57,7 @@ func (q *jobQueue) BringToFront(filename string) {
defer q.mut.Unlock()
for i, cur := range q.queued {
if cur == filename {
if cur.name == filename {
if i > 0 {
// Shift the elements before the selected element one step to
// the right, overwriting the selected element
@@ -79,7 +91,62 @@ func (q *jobQueue) Jobs() ([]string, []string) {
copy(progress, q.progress)
queued := make([]string, len(q.queued))
copy(queued, q.queued)
for i := range q.queued {
queued[i] = q.queued[i].name
}
return progress, queued
}
func (q *jobQueue) Shuffle() {
q.mut.Lock()
defer q.mut.Unlock()
l := len(q.queued)
for i := range q.queued {
r := rand.Intn(l)
q.queued[i], q.queued[r] = q.queued[r], q.queued[i]
}
}
func (q *jobQueue) SortSmallestFirst() {
q.mut.Lock()
defer q.mut.Unlock()
sort.Sort(smallestFirst(q.queued))
}
func (q *jobQueue) SortLargestFirst() {
q.mut.Lock()
defer q.mut.Unlock()
sort.Sort(sort.Reverse(smallestFirst(q.queued)))
}
func (q *jobQueue) SortOldestFirst() {
q.mut.Lock()
defer q.mut.Unlock()
sort.Sort(oldestFirst(q.queued))
}
func (q *jobQueue) SortNewestFirst() {
q.mut.Lock()
defer q.mut.Unlock()
sort.Sort(sort.Reverse(oldestFirst(q.queued)))
}
// The usual sort.Interface boilerplate
type smallestFirst []jobQueueEntry
func (q smallestFirst) Len() int { return len(q) }
func (q smallestFirst) Less(a, b int) bool { return q[a].size < q[b].size }
func (q smallestFirst) Swap(a, b int) { q[a], q[b] = q[b], q[a] }
type oldestFirst []jobQueueEntry
func (q oldestFirst) Len() int { return len(q) }
func (q oldestFirst) Less(a, b int) bool { return q[a].modified < q[b].modified }
func (q oldestFirst) Swap(a, b int) { q[a], q[b] = q[b], q[a] }

View File

@@ -15,10 +15,10 @@ import (
func TestJobQueue(t *testing.T) {
// Some random actions
q := newJobQueue()
q.Push("f1")
q.Push("f2")
q.Push("f3")
q.Push("f4")
q.Push("f1", 0, 0)
q.Push("f2", 0, 0)
q.Push("f3", 0, 0)
q.Push("f4", 0, 0)
progress, queued := q.Jobs()
if len(progress) != 0 || len(queued) != 4 {
@@ -43,7 +43,7 @@ func TestJobQueue(t *testing.T) {
t.Fatal("Wrong length", len(progress), len(queued))
}
q.Push(n)
q.Push(n, 0, 0)
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length")
@@ -120,10 +120,10 @@ func TestJobQueue(t *testing.T) {
func TestBringToFront(t *testing.T) {
q := newJobQueue()
q.Push("f1")
q.Push("f2")
q.Push("f3")
q.Push("f4")
q.Push("f1", 0, 0)
q.Push("f2", 0, 0)
q.Push("f3", 0, 0)
q.Push("f4", 0, 0)
_, queued := q.Jobs()
if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
@@ -159,12 +159,101 @@ func TestBringToFront(t *testing.T) {
}
}
func TestShuffle(t *testing.T) {
q := newJobQueue()
q.Push("f1", 0, 0)
q.Push("f2", 0, 0)
q.Push("f3", 0, 0)
q.Push("f4", 0, 0)
// This test will fail once in eight million times (1 / (4!)^5) :)
for i := 0; i < 5; i++ {
q.Shuffle()
_, queued := q.Jobs()
if l := len(queued); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
t.Logf("%v", queued)
if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
// The queue was shuffled
return
}
}
t.Error("Queue was not shuffled after five attempts.")
}
func TestSortBySize(t *testing.T) {
q := newJobQueue()
q.Push("f1", 20, 0)
q.Push("f2", 40, 0)
q.Push("f3", 30, 0)
q.Push("f4", 10, 0)
q.SortSmallestFirst()
_, actual := q.Jobs()
if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
expected := []string{"f4", "f1", "f3", "f2"}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("SortSmallestFirst(): %#v != %#v", actual, expected)
}
q.SortLargestFirst()
_, actual = q.Jobs()
if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
expected = []string{"f2", "f3", "f1", "f4"}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("SortLargestFirst(): %#v != %#v", actual, expected)
}
}
func TestSortByAge(t *testing.T) {
q := newJobQueue()
q.Push("f1", 0, 20)
q.Push("f2", 0, 40)
q.Push("f3", 0, 30)
q.Push("f4", 0, 10)
q.SortOldestFirst()
_, actual := q.Jobs()
if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
expected := []string{"f4", "f1", "f3", "f2"}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("SortOldestFirst(): %#v != %#v", actual, expected)
}
q.SortNewestFirst()
_, actual = q.Jobs()
if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l)
}
expected = []string{"f2", "f3", "f1", "f4"}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("SortNewestFirst(): %#v != %#v", actual, expected)
}
}
func BenchmarkJobQueueBump(b *testing.B) {
files := genFiles(b.N)
q := newJobQueue()
for _, f := range files {
q.Push(f.Name)
q.Push(f.Name, 0, 0)
}
b.ResetTimer()
@@ -180,7 +269,7 @@ func BenchmarkJobQueuePushPopDone10k(b *testing.B) {
for i := 0; i < b.N; i++ {
q := newJobQueue()
for _, f := range files {
q.Push(f.Name)
q.Push(f.Name, 0, 0)
}
for _ = range files {
n, _ := q.Pop()

View File

@@ -10,6 +10,8 @@ import (
"fmt"
"math/rand"
"time"
"github.com/syncthing/syncthing/internal/sync"
)
type roFolder struct {
@@ -23,11 +25,14 @@ type roFolder struct {
func newROFolder(model *Model, folder string, interval time.Duration) *roFolder {
return &roFolder{
stateTracker: stateTracker{folder: folder},
folder: folder,
intv: interval,
model: model,
stop: make(chan struct{}),
stateTracker: stateTracker{
folder: folder,
mut: sync.NewMutex(),
},
folder: folder,
intv: interval,
model: model,
stop: make(chan struct{}),
}
}

View File

@@ -13,7 +13,6 @@ import (
"math/rand"
"os"
"path/filepath"
"sync"
"time"
"github.com/syncthing/protocol"
@@ -24,6 +23,7 @@ import (
"github.com/syncthing/syncthing/internal/osutil"
"github.com/syncthing/syncthing/internal/scanner"
"github.com/syncthing/syncthing/internal/symlinks"
"github.com/syncthing/syncthing/internal/sync"
"github.com/syncthing/syncthing/internal/versioner"
)
@@ -69,6 +69,7 @@ type rwFolder struct {
copiers int
pullers int
shortID uint64
order config.PullOrder
stop chan struct{}
queue *jobQueue
@@ -77,7 +78,10 @@ type rwFolder struct {
func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
return &rwFolder{
stateTracker: stateTracker{folder: cfg.ID},
stateTracker: stateTracker{
folder: cfg.ID,
mut: sync.NewMutex(),
},
model: m,
progressEmitter: m.progressEmitter,
@@ -90,6 +94,7 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
copiers: cfg.Copiers,
pullers: cfg.Pullers,
shortID: shortID,
order: cfg.Order,
stop: make(chan struct{}),
queue: newJobQueue(),
@@ -118,6 +123,11 @@ func (p *rwFolder) Serve() {
var prevIgnoreHash string
rescheduleScan := func() {
if p.scanIntv == 0 {
// We should not run scans, so it should not be rescheduled.
return
}
// Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4
intv := time.Duration(sleepNanos) * time.Nanosecond
@@ -279,10 +289,10 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
copyChan := make(chan copyBlocksState)
finisherChan := make(chan *sharedPullerState)
var updateWg sync.WaitGroup
var copyWg sync.WaitGroup
var pullWg sync.WaitGroup
var doneWg sync.WaitGroup
updateWg := sync.NewWaitGroup()
copyWg := sync.NewWaitGroup()
pullWg := sync.NewWaitGroup()
doneWg := sync.NewWaitGroup()
if debug {
l.Debugln(p, "c", p.copiers, "p", p.pullers)
@@ -338,13 +348,9 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
buckets := map[string][]protocol.FileInfo{}
folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
// Needed items are delivered sorted lexicographically. This isn't
// really optimal from a performance point of view - it would be
// better if files were handled in random order, to spread the load
// over the cluster. But it means that we can be sure that we fully
// handle directories before the files that go inside them, which is
// nice.
// Needed items are delivered sorted lexicographically. We'll handle
// directories as they come along, so parents before children. Files
// are queued and the order may be changed later.
file := intf.(protocol.FileInfo)
@@ -384,13 +390,32 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
default:
// A new or changed file or symlink. This is the only case where we
// do stuff concurrently in the background
p.queue.Push(file.Name)
p.queue.Push(file.Name, file.Size(), file.Modified)
}
changed++
return true
})
// Reorder the file queue according to configuration
switch p.order {
case config.OrderRandom:
p.queue.Shuffle()
case config.OrderAlphabetic:
// The queue is already in alphabetic order.
case config.OrderSmallestFirst:
p.queue.SortSmallestFirst()
case config.OrderLargestFirst:
p.queue.SortLargestFirst()
case config.OrderOldestFirst:
p.queue.SortOldestFirst()
case config.OrderNewestFirst:
p.queue.SortOldestFirst()
}
// Process the file queue
nextFile:
for {
fileName, ok := p.queue.Pop()
@@ -799,6 +824,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
reused: reused,
ignorePerms: p.ignorePerms,
version: curFile.Version,
mut: sync.NewMutex(),
}
if debug {

View File

@@ -393,7 +393,7 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
}
// queue.Done should be called by the finisher routine
p.queue.Push("filex")
p.queue.Push("filex", 0, 0)
p.queue.Pop()
if len(p.queue.progress) != 1 {
@@ -480,7 +480,7 @@ func TestDeregisterOnFailInPull(t *testing.T) {
}
// queue.Done should be called by the finisher routine
p.queue.Push("filex")
p.queue.Push("filex", 0, 0)
p.queue.Pop()
if len(p.queue.progress) != 1 {

View File

@@ -10,10 +10,10 @@ import (
"io"
"os"
"path/filepath"
"sync"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/db"
"github.com/syncthing/syncthing/internal/sync"
)
// A sharedPullerState is kept for each file that is being synced and is kept
@@ -59,8 +59,8 @@ type lockedWriterAt struct {
}
func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
w.mut.Lock()
defer w.mut.Unlock()
(*w.mut).Lock()
defer (*w.mut).Unlock()
return w.wr.WriteAt(p, off)
}

View File

@@ -9,11 +9,14 @@ package model
import (
"os"
"testing"
"github.com/syncthing/syncthing/internal/sync"
)
func TestSourceFileOK(t *testing.T) {
s := sharedPullerState{
realName: "testdata/foo",
mut: sync.NewMutex(),
}
fd, err := s.sourceFile()
@@ -42,6 +45,7 @@ func TestSourceFileOK(t *testing.T) {
func TestSourceFileBad(t *testing.T) {
s := sharedPullerState{
realName: "nonexistent",
mut: sync.NewMutex(),
}
fd, err := s.sourceFile()
@@ -67,6 +71,7 @@ func TestReadOnlyDir(t *testing.T) {
s := sharedPullerState{
tempName: "testdata/read_only_dir/.temp_name",
mut: sync.NewMutex(),
}
fd, err := s.tempFile()

View File

@@ -15,14 +15,15 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"github.com/syncthing/syncthing/internal/sync"
)
var ErrNoHome = errors.New("No home directory found - set $HOME (or the platform equivalent).")
// Try to keep this entire operation atomic-like. We shouldn't be doing this
// often enough that there is any contention on this lock.
var renameLock sync.Mutex
var renameLock sync.Mutex = sync.NewMutex()
// TryRename renames a file, leaving source file intact in case of failure.
// Tries hard to succeed on various systems by temporarily tweaking directory

View File

@@ -9,9 +9,9 @@ package scanner
import (
"os"
"path/filepath"
"sync"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/sync"
)
// The parallell hasher reads FileInfo structures from the inbox, hashes the
@@ -20,7 +20,7 @@ import (
// is closed and all items handled.
func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo) {
var wg sync.WaitGroup
wg := sync.NewWaitGroup()
wg.Add(workers)
for i := 0; i < workers; i++ {

31
internal/sync/debug.go Normal file
View File

@@ -0,0 +1,31 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package sync
import (
"os"
"strconv"
"strings"
"time"
"github.com/calmh/logger"
)
var (
debug = strings.Contains(os.Getenv("STTRACE"), "locks") || os.Getenv("STTRACE") == "all"
threshold = time.Duration(100 * time.Millisecond)
l = logger.DefaultLogger
)
func init() {
if n, err := strconv.Atoi(os.Getenv("STLOCKTHRESHOLD")); debug && err == nil {
threshold = time.Duration(n) * time.Millisecond
}
if debug {
l.Debugf("Enabling lock logging at %v threshold", threshold)
}
}

141
internal/sync/sync.go Normal file
View File

@@ -0,0 +1,141 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package sync
import (
"fmt"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)
type Mutex interface {
Lock()
Unlock()
}
type RWMutex interface {
Mutex
RLock()
RUnlock()
}
type WaitGroup interface {
Add(int)
Done()
Wait()
}
func NewMutex() Mutex {
if debug {
return &loggedMutex{}
}
return &sync.Mutex{}
}
func NewRWMutex() RWMutex {
if debug {
return &loggedRWMutex{
unlockers: make([]string, 0),
}
}
return &sync.RWMutex{}
}
func NewWaitGroup() WaitGroup {
if debug {
return &loggedWaitGroup{}
}
return &sync.WaitGroup{}
}
type loggedMutex struct {
sync.Mutex
start time.Time
lockedAt string
}
func (m *loggedMutex) Lock() {
m.Mutex.Lock()
m.start = time.Now()
m.lockedAt = getCaller()
}
func (m *loggedMutex) Unlock() {
duration := time.Now().Sub(m.start)
if duration >= threshold {
l.Debugf("Mutex held for %v. Locked at %s unlocked at %s", duration, m.lockedAt, getCaller())
}
m.Mutex.Unlock()
}
type loggedRWMutex struct {
sync.RWMutex
start time.Time
lockedAt string
logUnlockers uint32
unlockers []string
unlockersMut sync.Mutex
}
func (m *loggedRWMutex) Lock() {
start := time.Now()
atomic.StoreUint32(&m.logUnlockers, 1)
m.RWMutex.Lock()
m.logUnlockers = 0
m.start = time.Now()
duration := m.start.Sub(start)
m.lockedAt = getCaller()
if duration > threshold {
l.Debugf("RWMutex took %v to lock. Locked at %s. RUnlockers while locking: %s", duration, m.lockedAt, strings.Join(m.unlockers, ", "))
}
m.unlockers = m.unlockers[0:]
}
func (m *loggedRWMutex) Unlock() {
duration := time.Now().Sub(m.start)
if duration >= threshold {
l.Debugf("RWMutex held for %v. Locked at %s: unlocked at %s", duration, m.lockedAt, getCaller())
}
m.RWMutex.Unlock()
}
func (m *loggedRWMutex) RUnlock() {
if atomic.LoadUint32(&m.logUnlockers) == 1 {
m.unlockersMut.Lock()
m.unlockers = append(m.unlockers, getCaller())
m.unlockersMut.Unlock()
}
m.RWMutex.RUnlock()
}
type loggedWaitGroup struct {
sync.WaitGroup
}
func (wg *loggedWaitGroup) Wait() {
start := time.Now()
wg.WaitGroup.Wait()
duration := time.Now().Sub(start)
if duration >= threshold {
l.Debugf("WaitGroup took %v at %s", duration, getCaller())
}
}
func getCaller() string {
_, file, line, _ := runtime.Caller(2)
file = filepath.Join(filepath.Base(filepath.Dir(file)), filepath.Base(file))
return fmt.Sprintf("%s:%d", file, line)
}

185
internal/sync/sync_test.go Normal file
View File

@@ -0,0 +1,185 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package sync
import (
"strings"
"sync"
"testing"
"time"
"github.com/calmh/logger"
)
const (
logThreshold = 100 * time.Millisecond
shortWait = 5 * time.Millisecond
longWait = 125 * time.Millisecond
)
func TestTypes(t *testing.T) {
debug = false
if _, ok := NewMutex().(*sync.Mutex); !ok {
t.Error("Wrong type")
}
if _, ok := NewRWMutex().(*sync.RWMutex); !ok {
t.Error("Wrong type")
}
if _, ok := NewWaitGroup().(*sync.WaitGroup); !ok {
t.Error("Wrong type")
}
debug = true
if _, ok := NewMutex().(*loggedMutex); !ok {
t.Error("Wrong type")
}
if _, ok := NewRWMutex().(*loggedRWMutex); !ok {
t.Error("Wrong type")
}
if _, ok := NewWaitGroup().(*loggedWaitGroup); !ok {
t.Error("Wrong type")
}
debug = false
}
func TestMutex(t *testing.T) {
debug = true
threshold = logThreshold
msgmut := sync.Mutex{}
messages := make([]string, 0)
l.AddHandler(logger.LevelDebug, func(_ logger.LogLevel, message string) {
msgmut.Lock()
messages = append(messages, message)
msgmut.Unlock()
})
mut := NewMutex()
mut.Lock()
time.Sleep(shortWait)
mut.Unlock()
if len(messages) > 0 {
t.Errorf("Unexpected message count")
}
mut.Lock()
time.Sleep(longWait)
mut.Unlock()
if len(messages) != 1 {
t.Errorf("Unexpected message count")
}
debug = false
}
func TestRWMutex(t *testing.T) {
debug = true
threshold = logThreshold
msgmut := sync.Mutex{}
messages := make([]string, 0)
l.AddHandler(logger.LevelDebug, func(_ logger.LogLevel, message string) {
msgmut.Lock()
messages = append(messages, message)
msgmut.Unlock()
})
mut := NewRWMutex()
mut.Lock()
time.Sleep(shortWait)
mut.Unlock()
if len(messages) > 0 {
t.Errorf("Unexpected message count")
}
mut.Lock()
time.Sleep(longWait)
mut.Unlock()
if len(messages) != 1 {
t.Errorf("Unexpected message count")
}
// Testing rlocker logging
mut.RLock()
go func() {
time.Sleep(longWait)
mut.RUnlock()
}()
mut.Lock()
mut.Unlock()
if len(messages) != 2 {
t.Errorf("Unexpected message count")
}
if !strings.Contains(messages[1], "RUnlockers while locking: sync") || !strings.Contains(messages[1], "sync_test.go:") {
t.Error("Unexpected message")
}
// Testing multiple rlockers
mut.RLock()
mut.RLock()
mut.RLock()
mut.RUnlock()
mut.RUnlock()
mut.RUnlock()
debug = false
}
func TestWaitGroup(t *testing.T) {
debug = true
threshold = logThreshold
msgmut := sync.Mutex{}
messages := make([]string, 0)
l.AddHandler(logger.LevelDebug, func(_ logger.LogLevel, message string) {
msgmut.Lock()
messages = append(messages, message)
msgmut.Unlock()
})
wg := NewWaitGroup()
wg.Add(1)
go func() {
time.Sleep(shortWait)
wg.Done()
}()
wg.Wait()
if len(messages) > 0 {
t.Errorf("Unexpected message count")
}
wg = NewWaitGroup()
wg.Add(1)
go func() {
time.Sleep(longWait)
wg.Done()
}()
wg.Wait()
if len(messages) != 1 {
t.Errorf("Unexpected message count")
}
debug = false
}

View File

@@ -22,8 +22,9 @@ import (
"net/url"
"regexp"
"strings"
"sync"
"time"
"github.com/syncthing/syncthing/internal/sync"
)
// A container for relevant properties of a UPnP InternetGatewayDevice.
@@ -129,7 +130,7 @@ func Discover(timeout time.Duration) []IGD {
}
}()
var wg sync.WaitGroup
wg := sync.NewWaitGroup()
for _, intf := range interfaces {
for _, deviceType := range []string{"urn:schemas-upnp-org:device:InternetGatewayDevice:1", "urn:schemas-upnp-org:device:InternetGatewayDevice:2"} {
wg.Add(1)
@@ -453,7 +454,7 @@ func soapRequest(url, service, function, message string) ([]byte, error) {
}
req.Header.Set("Content-Type", `text/xml; charset="utf-8"`)
req.Header.Set("User-Agent", "syncthing/1.0")
req.Header.Set("SOAPAction", fmt.Sprintf(`"%s#%s"`, service, function))
req.Header["SOAPAction"] = []string{fmt.Sprintf(`"%s#%s"`, service, function)} // Enforce capitalization in header-entry for sensitive routers. See issue #1696
req.Header.Set("Connection", "Close")
req.Header.Set("Cache-Control", "no-cache")
req.Header.Set("Pragma", "no-cache")

View File

@@ -11,10 +11,10 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/syncthing/syncthing/internal/osutil"
"github.com/syncthing/syncthing/internal/sync"
)
func init() {
@@ -33,7 +33,7 @@ type Staggered struct {
cleanInterval int64
folderPath string
interval [4]Interval
mutex *sync.Mutex
mutex sync.Mutex
}
// Rename versions with old version format
@@ -87,7 +87,6 @@ func NewStaggered(folderID, folderPath string, params map[string]string) Version
versionsDir = params["versionsPath"]
}
var mutex sync.Mutex
s := Staggered{
versionsPath: versionsDir,
cleanInterval: cleanInterval,
@@ -98,7 +97,7 @@ func NewStaggered(folderID, folderPath string, params map[string]string) Version
{86400, 592000}, // next 30 days -> 1 day between versions
{604800, maxAge}, // next year -> 1 week between versions
},
mutex: &mutex,
mutex: sync.NewMutex(),
}
if debug {

3
test/.gitignore vendored
View File

@@ -17,3 +17,6 @@ s4d
http
h*/index*
*.syncthing-reset*
panic-*.log
audit-*.log
h*/config.xml.v*