Compare commits

..

20 Commits

Author SHA1 Message Date
Jakob Borg
50b37f1366 Revert "Add temporary debug logging for #344 (revert later)"
This reverts commit 5353659f9f.
2014-07-08 11:49:28 +02:00
Jakob Borg
a7b6e35467 incomingIndexes should not be a package variable (fixes #344) 2014-07-08 11:49:11 +02:00
Ben Sidhom
37d83a4e2e Continue discovery on connect errors (fixes #324)
Continues trying to connect to the discovery server at regular intervals despite
failure. Whether or not to retry and retry interval should be specified in
configuration (not currently in this fix).
2014-07-05 23:10:11 +02:00
Jakob Borg
a720f90a70 Don't leak writer and index goroutines on close 2014-07-04 15:16:33 +02:00
Jakob Borg
4a6b43bcae Clean up protocol locking and closing 2014-07-03 13:37:20 +02:00
Jakob Borg
2f5a822ca4 Send initial index in batches 2014-07-03 12:30:10 +02:00
Jakob Borg
bc1d04f0b9 Always send initial index, even if empty (ref #344) 2014-07-02 21:50:11 +02:00
Jakob Borg
381795d6d0 Simplify locking in protocol.Index 2014-07-02 21:49:24 +02:00
Jakob Borg
6ade27641d Protocol state machine on receiving side 2014-07-02 21:33:30 +02:00
Jakob Borg
53898d2c60 Log client version on connect 2014-07-02 20:43:43 +02:00
Jakob Borg
91c4ff6009 Handle query parameters in UPnP control URL (fixes #211) 2014-07-02 20:28:03 +02:00
Jakob Borg
0aa067a726 Avoid deadlock during initial scan (fixes #389) 2014-07-02 07:40:27 +02:00
Jakob Borg
5353659f9f Add temporary debug logging for #344 (revert later) 2014-07-01 17:08:14 +02:00
Jakob Borg
7ac00e189b Tone down UPnP not found message (fixes #406) 2014-07-01 17:06:07 +02:00
Jakob Borg
a2da31056b Increase deadlock timeout, make configurable (fixes #389, fixes #393) 2014-06-26 11:29:41 +02:00
Jakob Borg
2383579a64 Remove spurious debug output in .stignore handling 2014-06-23 21:54:28 +02:00
Jakob Borg
68750211ef Connection notices are informational 2014-06-23 15:38:37 +02:00
Jakob Borg
db3e3ade80 No need to hold a write lock in Override 2014-06-23 11:52:13 +02:00
Jakob Borg
e6f04ed238 Don't whine about unexpected EOFs 2014-06-23 10:52:09 +02:00
Jakob Borg
a6eb690e31 Ensure correct version string format 2014-06-23 10:40:09 +02:00
6 changed files with 190 additions and 139 deletions

View File

@@ -18,6 +18,7 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"runtime"
"runtime/debug"
"runtime/pprof"
@@ -48,6 +49,14 @@ var (
var l = logger.DefaultLogger
func init() {
if Version != "unknown-dev" {
// If not a generic dev build, version string should come from git describe
exp := regexp.MustCompile(`^v\d+\.\d+\.\d+(-\d+-g[0-9a-f]+)?(-dirty)?$`)
if !exp.MatchString(Version) {
l.Fatalf("Invalid version string %q;\n\tdoes not match regexp %v", Version, exp)
}
}
stamp, _ := strconv.Atoi(BuildStamp)
BuildDate = time.Unix(int64(stamp), 0)
@@ -106,7 +115,9 @@ The following enviroment variables are interpreted by syncthing:
STCPUPROFILE Write CPU profile to the specified file.
STGUIASSETS Directory to load GUI assets from. Overrides compiled in assets.`
STGUIASSETS Directory to load GUI assets from. Overrides compiled in assets.
STDEADLOCKTIMEOUT Alter deadlock detection timeout (seconds; default 1200).`
)
func init() {
@@ -472,7 +483,10 @@ func setupUPnP(r rand.Source) int {
l.Warnln("Failed to create UPnP port mapping")
}
} else {
l.Infof("No UPnP IGD device found, no port mapping created (%v)", err)
l.Infof("No UPnP gateway detected")
if debugNet {
l.Debugf("UPnP: %v", err)
}
}
}
} else {
@@ -694,6 +708,9 @@ next:
wr = &limitedWriter{conn, rateBucket}
}
protoConn := protocol.NewConnection(remoteID, conn, wr, m)
l.Infof("Connection to %s established at %v", remoteID, conn.RemoteAddr())
m.AddConnection(conn, protoConn)
continue next
}

View File

@@ -168,16 +168,21 @@ func (d *Discoverer) sendLocalAnnouncements() {
}
func (d *Discoverer) sendExternalAnnouncements() {
// this should go in the Discoverer struct
errorRetryIntv := 60 * time.Second
remote, err := net.ResolveUDPAddr("udp", d.extServer)
if err != nil {
l.Warnf("Global discovery: %v; no external announcements", err)
return
for err != nil {
l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv)
time.Sleep(errorRetryIntv)
remote, err = net.ResolveUDPAddr("udp", d.extServer)
}
conn, err := net.ListenUDP("udp", nil)
if err != nil {
l.Warnf("Global discovery: %v; no external announcements", err)
return
for err != nil {
l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv)
time.Sleep(errorRetryIntv)
conn, err = net.ListenUDP("udp", nil)
}
var buf []byte
@@ -198,7 +203,7 @@ func (d *Discoverer) sendExternalAnnouncements() {
l.Debugf("discover: send announcement -> %v\n%s", remote, hex.Dump(buf))
}
_, err = conn.WriteTo(buf, remote)
_, err := conn.WriteTo(buf, remote)
if err != nil {
if debug {
l.Debugln("discover: warning:", err)
@@ -222,7 +227,7 @@ func (d *Discoverer) sendExternalAnnouncements() {
if ok {
time.Sleep(d.globalBcastIntv)
} else {
time.Sleep(60 * time.Second)
time.Sleep(errorRetryIntv)
}
}
}

View File

@@ -13,6 +13,7 @@ import (
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"
@@ -97,9 +98,16 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers
sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
}
deadlockDetect(&m.rmut, 60*time.Second)
deadlockDetect(&m.smut, 60*time.Second)
deadlockDetect(&m.pmut, 60*time.Second)
var timeout = 20 * 60 // seconds
if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
it, err := strconv.Atoi(t)
if err == nil {
timeout = it
}
}
deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
go m.broadcastIndexLoop()
return m
}
@@ -361,20 +369,14 @@ func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessag
m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
}
m.pmut.Unlock()
l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
}
// Close removes the peer from the model and closes the underlying connection if possible.
// Implements the protocol.Model interface.
func (m *Model) Close(node string, err error) {
if debug {
l.Debugf("%s: %v", node, err)
}
if err != io.EOF {
l.Warnf("Connection to %s closed: %v", node, err)
} else if _, ok := err.(ClusterConfigMismatch); ok {
l.Warnf("Connection to %s closed: %v", node, err)
}
l.Infof("Connection to %s closed: %v", node, err)
cid := m.cm.Get(node)
m.rmut.RLock()
@@ -451,19 +453,6 @@ func (m *Model) ReplaceLocal(repo string, fs []scanner.File) {
m.rmut.RUnlock()
}
func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) {
var sfs = make([]scanner.File, len(fs))
for i := 0; i < len(fs); i++ {
lamport.Default.Tick(fs[i].Version)
sfs[i] = fileFromFileInfo(fs[i])
sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
}
m.rmut.RLock()
m.repoFiles[repo].Replace(cid.LocalID, sfs)
m.rmut.RUnlock()
}
func (m *Model) CurrentRepoFile(repo string, file string) scanner.File {
m.rmut.RLock()
f := m.repoFiles[repo].Get(cid.LocalID, file)
@@ -528,7 +517,14 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
if debug {
l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
}
protoConn.Index(repo, idx)
const batchSize = 1000
for i := 0; i < len(idx); i += batchSize {
if len(idx[i:]) < batchSize {
protoConn.Index(repo, idx[i:])
} else {
protoConn.Index(repo, idx[i:i+batchSize])
}
}
}
}()
}
@@ -733,7 +729,15 @@ func (m *Model) LoadIndexes(dir string) {
m.rmut.RLock()
for repo := range m.repoCfgs {
fs := m.loadIndex(repo, dir)
m.SeedLocal(repo, fs)
var sfs = make([]scanner.File, len(fs))
for i := 0; i < len(fs); i++ {
lamport.Default.Tick(fs[i].Version)
sfs[i] = fileFromFileInfo(fs[i])
sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
}
m.repoFiles[repo].Replace(cid.LocalID, sfs)
}
m.rmut.RUnlock()
}
@@ -857,8 +861,10 @@ func (m *Model) State(repo string) string {
func (m *Model) Override(repo string) {
fs := m.NeedFilesRepo(repo)
m.rmut.Lock()
m.rmut.RLock()
r := m.repoFiles[repo]
m.rmut.RUnlock()
for i := range fs {
f := &fs[i]
h := r.Get(cid.LocalID, f.Name)
@@ -872,7 +878,6 @@ func (m *Model) Override(repo string) {
}
f.Version = lamport.Default.Tick(f.Version)
}
m.rmut.Unlock()
r.Update(cid.LocalID, fs)
}

View File

@@ -28,6 +28,12 @@ const (
messageTypeIndexUpdate = 6
)
const (
stateInitial = iota
stateCCRcvd
stateIdxRcvd
)
const (
FlagDeleted uint32 = 1 << 12
FlagInvalid = 1 << 13
@@ -70,26 +76,36 @@ type Connection interface {
type rawConnection struct {
id string
receiver Model
state int
reader io.ReadCloser
cr *countingReader
xr *xdr.Reader
writer io.WriteCloser
cw *countingWriter
wb *bufio.Writer
xw *xdr.Writer
cw *countingWriter
wb *bufio.Writer
xw *xdr.Writer
wmut sync.Mutex
awaiting []chan asyncResult
awaitingMut sync.Mutex
indexSent map[string]map[string]uint64
awaiting []chan asyncResult
imut sync.Mutex
idxMut sync.Mutex // ensures serialization of Index calls
idxSent map[string]map[string]uint64
idxMut sync.Mutex // ensures serialization of Index calls
nextID chan int
outbox chan []encodable
closed chan struct{}
once sync.Once
incomingIndexes chan incomingIndex
}
type incomingIndex struct {
update bool
id string
repo string
files []FileInfo
}
type asyncResult struct {
@@ -114,20 +130,22 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
wb := bufio.NewWriter(flwr)
c := rawConnection{
id: nodeID,
receiver: nativeModel{receiver},
reader: flrd,
cr: cr,
xr: xdr.NewReader(flrd),
writer: flwr,
cw: cw,
wb: wb,
xw: xdr.NewWriter(wb),
awaiting: make([]chan asyncResult, 0x1000),
indexSent: make(map[string]map[string]uint64),
outbox: make(chan []encodable),
nextID: make(chan int),
closed: make(chan struct{}),
id: nodeID,
receiver: nativeModel{receiver},
state: stateInitial,
reader: flrd,
cr: cr,
xr: xdr.NewReader(flrd),
writer: flwr,
cw: cw,
wb: wb,
xw: xdr.NewWriter(wb),
awaiting: make([]chan asyncResult, 0x1000),
idxSent: make(map[string]map[string]uint64),
outbox: make(chan []encodable),
nextID: make(chan int),
closed: make(chan struct{}),
incomingIndexes: make(chan incomingIndex, 100), // should be enough for anyone, right?
}
go c.indexSerializerLoop()
@@ -148,31 +166,29 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) {
c.idxMut.Lock()
defer c.idxMut.Unlock()
c.imut.Lock()
var msgType int
if c.indexSent[repo] == nil {
if c.idxSent[repo] == nil {
// This is the first time we send an index.
msgType = messageTypeIndex
c.indexSent[repo] = make(map[string]uint64)
c.idxSent[repo] = make(map[string]uint64)
for _, f := range idx {
c.indexSent[repo][f.Name] = f.Version
c.idxSent[repo][f.Name] = f.Version
}
} else {
// We have sent one full index. Only send updates now.
msgType = messageTypeIndexUpdate
var diff []FileInfo
for _, f := range idx {
if vs, ok := c.indexSent[repo][f.Name]; !ok || f.Version != vs {
if vs, ok := c.idxSent[repo][f.Name]; !ok || f.Version != vs {
diff = append(diff, f)
c.indexSent[repo][f.Name] = f.Version
c.idxSent[repo][f.Name] = f.Version
}
}
idx = diff
}
c.imut.Unlock()
if len(idx) > 0 {
if msgType == messageTypeIndex || len(idx) > 0 {
c.send(header{0, -1, msgType}, IndexMessage{repo, idx})
}
}
@@ -186,13 +202,13 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
return nil, ErrClosed
}
c.imut.Lock()
c.awaitingMut.Lock()
if ch := c.awaiting[id]; ch != nil {
panic("id taken")
}
rc := make(chan asyncResult)
rc := make(chan asyncResult, 1)
c.awaiting[id] = rc
c.imut.Unlock()
c.awaitingMut.Unlock()
ok := c.send(header{0, id, messageTypeRequest},
RequestMessage{repo, name, uint64(offset), uint32(size)})
@@ -221,9 +237,9 @@ func (c *rawConnection) ping() bool {
}
rc := make(chan asyncResult, 1)
c.imut.Lock()
c.awaitingMut.Lock()
c.awaiting[id] = rc
c.imut.Unlock()
c.awaitingMut.Unlock()
ok := c.send(header{0, id, messageTypePing})
if !ok {
@@ -257,21 +273,34 @@ func (c *rawConnection) readerLoop() (err error) {
switch hdr.msgType {
case messageTypeIndex:
if c.state < stateCCRcvd {
return fmt.Errorf("protocol error: index message in state %d", c.state)
}
if err := c.handleIndex(); err != nil {
return err
}
c.state = stateIdxRcvd
case messageTypeIndexUpdate:
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: index update message in state %d", c.state)
}
if err := c.handleIndexUpdate(); err != nil {
return err
}
case messageTypeRequest:
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: request message in state %d", c.state)
}
if err := c.handleRequest(hdr); err != nil {
return err
}
case messageTypeResponse:
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: response message in state %d", c.state)
}
if err := c.handleResponse(hdr); err != nil {
return err
}
@@ -283,9 +312,13 @@ func (c *rawConnection) readerLoop() (err error) {
c.handlePong(hdr)
case messageTypeClusterConfig:
if c.state != stateInitial {
return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
}
if err := c.handleClusterConfig(); err != nil {
return err
}
c.state = stateCCRcvd
default:
return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
@@ -293,15 +326,6 @@ func (c *rawConnection) readerLoop() (err error) {
}
}
type incomingIndex struct {
update bool
id string
repo string
files []FileInfo
}
var incomingIndexes = make(chan incomingIndex, 100) // should be enough for anyone, right?
func (c *rawConnection) indexSerializerLoop() {
// We must avoid blocking the reader loop when processing large indexes.
// There is otherwise a potential deadlock where both sides has the model
@@ -309,11 +333,16 @@ func (c *rawConnection) indexSerializerLoop() {
// large index update from the other side. But we must also ensure to
// process the indexes in the order they are received, hence the separate
// routine and buffered channel.
for ii := range incomingIndexes {
if ii.update {
c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
} else {
c.receiver.Index(ii.id, ii.repo, ii.files)
for {
select {
case ii := <-c.incomingIndexes:
if ii.update {
c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
} else {
c.receiver.Index(ii.id, ii.repo, ii.files)
}
case <-c.closed:
return
}
}
}
@@ -332,7 +361,7 @@ func (c *rawConnection) handleIndex() error {
// update and can't receive the large index update from the
// other side.
incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files}
c.incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files}
}
return nil
}
@@ -343,7 +372,7 @@ func (c *rawConnection) handleIndexUpdate() error {
if err := c.xr.Error(); err != nil {
return err
} else {
incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files}
c.incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files}
}
return nil
}
@@ -365,32 +394,25 @@ func (c *rawConnection) handleResponse(hdr header) error {
return err
}
go func(hdr header, err error) {
c.imut.Lock()
rc := c.awaiting[hdr.msgID]
c.awaitingMut.Lock()
if rc := c.awaiting[hdr.msgID]; rc != nil {
c.awaiting[hdr.msgID] = nil
c.imut.Unlock()
if rc != nil {
rc <- asyncResult{data, err}
close(rc)
}
}(hdr, c.xr.Error())
rc <- asyncResult{data, nil}
close(rc)
}
c.awaitingMut.Unlock()
return nil
}
func (c *rawConnection) handlePong(hdr header) {
c.imut.Lock()
c.awaitingMut.Lock()
if rc := c.awaiting[hdr.msgID]; rc != nil {
go func() {
rc <- asyncResult{}
close(rc)
}()
c.awaiting[hdr.msgID] = nil
rc <- asyncResult{}
close(rc)
}
c.imut.Unlock()
c.awaitingMut.Unlock()
}
func (c *rawConnection) handleClusterConfig() error {
@@ -434,18 +456,20 @@ func (c *rawConnection) send(h header, es ...encodable) bool {
func (c *rawConnection) writerLoop() {
var err error
for es := range c.outbox {
c.wmut.Lock()
for _, e := range es {
e.encodeXDR(c.xw)
}
for {
select {
case es := <-c.outbox:
for _, e := range es {
e.encodeXDR(c.xw)
}
if err = c.flush(); err != nil {
c.wmut.Unlock()
c.close(err)
if err = c.flush(); err != nil {
c.close(err)
return
}
case <-c.closed:
return
}
c.wmut.Unlock()
}
}
@@ -470,29 +494,20 @@ func (c *rawConnection) flush() error {
}
func (c *rawConnection) close(err error) {
c.imut.Lock()
c.wmut.Lock()
defer c.imut.Unlock()
defer c.wmut.Unlock()
select {
case <-c.closed:
return
default:
c.once.Do(func() {
close(c.closed)
c.awaitingMut.Lock()
for i, ch := range c.awaiting {
if ch != nil {
close(ch)
c.awaiting[i] = nil
}
}
c.writer.Close()
c.reader.Close()
c.awaitingMut.Unlock()
go c.receiver.Close(c.id, err)
}
})
}
func (c *rawConnection) idGenerator() {
@@ -554,8 +569,7 @@ func (c *rawConnection) pingerLoop() {
func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
c.send(header{0, msgID, messageTypeResponse},
encodableBytes(data))
c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data))
}
type Statistics struct {

View File

@@ -106,7 +106,6 @@ func (w *Walker) loadIgnoreFiles(dir string, ign map[string][]string) filepath.W
if pn, sn := filepath.Split(rn); sn == w.IgnoreFile {
pn := filepath.Clean(pn)
l.Debugf("pn: %q", pn)
bs, _ := ioutil.ReadFile(p)
lines := bytes.Split(bs, []byte("\n"))
var patterns []string
@@ -287,7 +286,6 @@ func (w *Walker) ignoreFile(patterns map[string][]string, file string) bool {
for prefix, pats := range patterns {
if prefix == "." || prefix == first || strings.HasPrefix(first, fmt.Sprintf("%s%c", prefix, os.PathSeparator)) {
for _, pattern := range pats {
l.Debugf("%q %q", pattern, last)
if match, _ := filepath.Match(pattern, last); match {
return true
}

View File

@@ -203,14 +203,26 @@ func getServiceURL(rootURL string) (string, error) {
}
u, _ := url.Parse(rootURL)
if svc.ControlURL[0] == '/' {
u.Path = svc.ControlURL
} else {
u.Path += svc.ControlURL
}
replaceRawPath(u, svc.ControlURL)
return u.String(), nil
}
func replaceRawPath(u *url.URL, rp string) {
var p, q string
fs := strings.Split(rp, "?")
p = fs[0]
if len(fs) > 1 {
q = fs[1]
}
if p[0] == '/' {
u.Path = p
} else {
u.Path += p
}
u.RawQuery = q
}
func soapRequest(url, function, message string) error {
tpl := `<?xml version="1.0" ?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">