Compare commits

..

10 Commits

Author SHA1 Message Date
Jakob Borg
4a6b43bcae Clean up protocol locking and closing 2014-07-03 13:37:20 +02:00
Jakob Borg
2f5a822ca4 Send initial index in batches 2014-07-03 12:30:10 +02:00
Jakob Borg
bc1d04f0b9 Always send initial index, even if empty (ref #344) 2014-07-02 21:50:11 +02:00
Jakob Borg
381795d6d0 Simplify locking in protocol.Index 2014-07-02 21:49:24 +02:00
Jakob Borg
6ade27641d Protocol state machine on receiving side 2014-07-02 21:33:30 +02:00
Jakob Borg
53898d2c60 Log client version on connect 2014-07-02 20:43:43 +02:00
Jakob Borg
91c4ff6009 Handle query parameters in UPnP control URL (fixes #211) 2014-07-02 20:28:03 +02:00
Jakob Borg
0aa067a726 Avoid deadlock during initial scan (fixes #389) 2014-07-02 07:40:27 +02:00
Jakob Borg
5353659f9f Add temporary debug logging for #344 (revert later) 2014-07-01 17:08:14 +02:00
Jakob Borg
7ac00e189b Tone down UPnP not found message (fixes #406) 2014-07-01 17:06:07 +02:00
6 changed files with 173 additions and 92 deletions

View File

@@ -483,7 +483,10 @@ func setupUPnP(r rand.Source) int {
l.Warnln("Failed to create UPnP port mapping")
}
} else {
l.Infof("No UPnP IGD device found, no port mapping created (%v)", err)
l.Infof("No UPnP gateway detected")
if debugNet {
l.Debugf("UPnP: %v", err)
}
}
}
} else {

View File

@@ -49,6 +49,7 @@ func (m *Set) Replace(id uint, fs []scanner.File) {
}
m.Lock()
log("Replace", id, len(fs))
if len(fs) == 0 || !m.equals(id, fs) {
m.changes[id]++
m.replace(id, fs)
@@ -65,6 +66,7 @@ func (m *Set) ReplaceWithDelete(id uint, fs []scanner.File) {
}
m.Lock()
log("ReplaceWithDelete", id, len(fs))
if len(fs) == 0 || !m.equals(id, fs) {
m.changes[id]++
@@ -102,7 +104,9 @@ func (m *Set) Update(id uint, fs []scanner.File) {
if debug {
l.Debugf("Update(%d, [%d])", id, len(fs))
}
m.Lock()
log("Update", id, len(fs))
m.update(id, fs)
m.changes[id]++
m.Unlock()
@@ -220,6 +224,7 @@ func (m *Set) equals(id uint, fs []scanner.File) bool {
func (m *Set) update(cid uint, fs []scanner.File) {
remFiles := m.remoteKey[cid]
if remFiles == nil {
printLog()
l.Fatalln("update before replace for cid", cid)
}
for _, f := range fs {

51
files/set_debug.go Normal file
View File

@@ -0,0 +1,51 @@
package files
import (
"fmt"
"time"
"github.com/calmh/syncthing/cid"
)
type logEntry struct {
time time.Time
method string
cid uint
node string
nfiles int
}
func (l logEntry) String() string {
return fmt.Sprintf("%v: %s cid:%d node:%s nfiles:%d", l.time, l.method, l.cid, l.node, l.nfiles)
}
var (
debugLog [10]logEntry
debugNext int
cm *cid.Map
)
func SetCM(m *cid.Map) {
cm = m
}
func log(method string, id uint, nfiles int) {
e := logEntry{
time: time.Now(),
method: method,
cid: id,
nfiles: nfiles,
}
if cm != nil {
e.node = cm.Name(id)
}
debugLog[debugNext] = e
debugNext = (debugNext + 1) % len(debugLog)
}
func printLog() {
l.Debugln("--- Consistency error ---")
for _, e := range debugLog {
l.Debugln(e)
}
}

View File

@@ -98,6 +98,9 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers
sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
}
// TEMP: #344
files.SetCM(m.cm)
var timeout = 20 * 60 // seconds
if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
it, err := strconv.Atoi(t)
@@ -369,6 +372,8 @@ func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessag
m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
}
m.pmut.Unlock()
l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
}
// Close removes the peer from the model and closes the underlying connection if possible.
@@ -451,19 +456,6 @@ func (m *Model) ReplaceLocal(repo string, fs []scanner.File) {
m.rmut.RUnlock()
}
func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) {
var sfs = make([]scanner.File, len(fs))
for i := 0; i < len(fs); i++ {
lamport.Default.Tick(fs[i].Version)
sfs[i] = fileFromFileInfo(fs[i])
sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
}
m.rmut.RLock()
m.repoFiles[repo].Replace(cid.LocalID, sfs)
m.rmut.RUnlock()
}
func (m *Model) CurrentRepoFile(repo string, file string) scanner.File {
m.rmut.RLock()
f := m.repoFiles[repo].Get(cid.LocalID, file)
@@ -528,7 +520,14 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
if debug {
l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
}
protoConn.Index(repo, idx)
const batchSize = 1000
for i := 0; i < len(idx); i += batchSize {
if len(idx[i:]) < batchSize {
protoConn.Index(repo, idx[i:])
} else {
protoConn.Index(repo, idx[i:i+batchSize])
}
}
}
}()
}
@@ -733,7 +732,15 @@ func (m *Model) LoadIndexes(dir string) {
m.rmut.RLock()
for repo := range m.repoCfgs {
fs := m.loadIndex(repo, dir)
m.SeedLocal(repo, fs)
var sfs = make([]scanner.File, len(fs))
for i := 0; i < len(fs); i++ {
lamport.Default.Tick(fs[i].Version)
sfs[i] = fileFromFileInfo(fs[i])
sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
}
m.repoFiles[repo].Replace(cid.LocalID, sfs)
}
m.rmut.RUnlock()
}

View File

@@ -28,6 +28,12 @@ const (
messageTypeIndexUpdate = 6
)
const (
stateInitial = iota
stateCCRcvd
stateIdxRcvd
)
const (
FlagDeleted uint32 = 1 << 12
FlagInvalid = 1 << 13
@@ -70,26 +76,27 @@ type Connection interface {
type rawConnection struct {
id string
receiver Model
state int
reader io.ReadCloser
cr *countingReader
xr *xdr.Reader
writer io.WriteCloser
cw *countingWriter
wb *bufio.Writer
xw *xdr.Writer
cw *countingWriter
wb *bufio.Writer
xw *xdr.Writer
wmut sync.Mutex
awaiting []chan asyncResult
awaitingMut sync.Mutex
indexSent map[string]map[string]uint64
awaiting []chan asyncResult
imut sync.Mutex
idxMut sync.Mutex // ensures serialization of Index calls
idxSent map[string]map[string]uint64
idxMut sync.Mutex // ensures serialization of Index calls
nextID chan int
outbox chan []encodable
closed chan struct{}
once sync.Once
}
type asyncResult struct {
@@ -114,20 +121,21 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
wb := bufio.NewWriter(flwr)
c := rawConnection{
id: nodeID,
receiver: nativeModel{receiver},
reader: flrd,
cr: cr,
xr: xdr.NewReader(flrd),
writer: flwr,
cw: cw,
wb: wb,
xw: xdr.NewWriter(wb),
awaiting: make([]chan asyncResult, 0x1000),
indexSent: make(map[string]map[string]uint64),
outbox: make(chan []encodable),
nextID: make(chan int),
closed: make(chan struct{}),
id: nodeID,
receiver: nativeModel{receiver},
state: stateInitial,
reader: flrd,
cr: cr,
xr: xdr.NewReader(flrd),
writer: flwr,
cw: cw,
wb: wb,
xw: xdr.NewWriter(wb),
awaiting: make([]chan asyncResult, 0x1000),
idxSent: make(map[string]map[string]uint64),
outbox: make(chan []encodable),
nextID: make(chan int),
closed: make(chan struct{}),
}
go c.indexSerializerLoop()
@@ -148,31 +156,29 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) {
c.idxMut.Lock()
defer c.idxMut.Unlock()
c.imut.Lock()
var msgType int
if c.indexSent[repo] == nil {
if c.idxSent[repo] == nil {
// This is the first time we send an index.
msgType = messageTypeIndex
c.indexSent[repo] = make(map[string]uint64)
c.idxSent[repo] = make(map[string]uint64)
for _, f := range idx {
c.indexSent[repo][f.Name] = f.Version
c.idxSent[repo][f.Name] = f.Version
}
} else {
// We have sent one full index. Only send updates now.
msgType = messageTypeIndexUpdate
var diff []FileInfo
for _, f := range idx {
if vs, ok := c.indexSent[repo][f.Name]; !ok || f.Version != vs {
if vs, ok := c.idxSent[repo][f.Name]; !ok || f.Version != vs {
diff = append(diff, f)
c.indexSent[repo][f.Name] = f.Version
c.idxSent[repo][f.Name] = f.Version
}
}
idx = diff
}
c.imut.Unlock()
if len(idx) > 0 {
if msgType == messageTypeIndex || len(idx) > 0 {
c.send(header{0, -1, msgType}, IndexMessage{repo, idx})
}
}
@@ -186,13 +192,13 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
return nil, ErrClosed
}
c.imut.Lock()
c.awaitingMut.Lock()
if ch := c.awaiting[id]; ch != nil {
panic("id taken")
}
rc := make(chan asyncResult)
rc := make(chan asyncResult, 1)
c.awaiting[id] = rc
c.imut.Unlock()
c.awaitingMut.Unlock()
ok := c.send(header{0, id, messageTypeRequest},
RequestMessage{repo, name, uint64(offset), uint32(size)})
@@ -221,9 +227,9 @@ func (c *rawConnection) ping() bool {
}
rc := make(chan asyncResult, 1)
c.imut.Lock()
c.awaitingMut.Lock()
c.awaiting[id] = rc
c.imut.Unlock()
c.awaitingMut.Unlock()
ok := c.send(header{0, id, messageTypePing})
if !ok {
@@ -257,21 +263,34 @@ func (c *rawConnection) readerLoop() (err error) {
switch hdr.msgType {
case messageTypeIndex:
if c.state < stateCCRcvd {
return fmt.Errorf("protocol error: index message in state %d", c.state)
}
if err := c.handleIndex(); err != nil {
return err
}
c.state = stateIdxRcvd
case messageTypeIndexUpdate:
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: index update message in state %d", c.state)
}
if err := c.handleIndexUpdate(); err != nil {
return err
}
case messageTypeRequest:
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: request message in state %d", c.state)
}
if err := c.handleRequest(hdr); err != nil {
return err
}
case messageTypeResponse:
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: response message in state %d", c.state)
}
if err := c.handleResponse(hdr); err != nil {
return err
}
@@ -283,9 +302,13 @@ func (c *rawConnection) readerLoop() (err error) {
c.handlePong(hdr)
case messageTypeClusterConfig:
if c.state != stateInitial {
return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
}
if err := c.handleClusterConfig(); err != nil {
return err
}
c.state = stateCCRcvd
default:
return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
@@ -365,32 +388,25 @@ func (c *rawConnection) handleResponse(hdr header) error {
return err
}
go func(hdr header, err error) {
c.imut.Lock()
rc := c.awaiting[hdr.msgID]
c.awaitingMut.Lock()
if rc := c.awaiting[hdr.msgID]; rc != nil {
c.awaiting[hdr.msgID] = nil
c.imut.Unlock()
if rc != nil {
rc <- asyncResult{data, err}
close(rc)
}
}(hdr, c.xr.Error())
rc <- asyncResult{data, nil}
close(rc)
}
c.awaitingMut.Unlock()
return nil
}
func (c *rawConnection) handlePong(hdr header) {
c.imut.Lock()
c.awaitingMut.Lock()
if rc := c.awaiting[hdr.msgID]; rc != nil {
go func() {
rc <- asyncResult{}
close(rc)
}()
c.awaiting[hdr.msgID] = nil
rc <- asyncResult{}
close(rc)
}
c.imut.Unlock()
c.awaitingMut.Unlock()
}
func (c *rawConnection) handleClusterConfig() error {
@@ -435,17 +451,14 @@ func (c *rawConnection) send(h header, es ...encodable) bool {
func (c *rawConnection) writerLoop() {
var err error
for es := range c.outbox {
c.wmut.Lock()
for _, e := range es {
e.encodeXDR(c.xw)
}
if err = c.flush(); err != nil {
c.wmut.Unlock()
c.close(err)
return
}
c.wmut.Unlock()
}
}
@@ -470,29 +483,20 @@ func (c *rawConnection) flush() error {
}
func (c *rawConnection) close(err error) {
c.imut.Lock()
c.wmut.Lock()
defer c.imut.Unlock()
defer c.wmut.Unlock()
select {
case <-c.closed:
return
default:
c.once.Do(func() {
close(c.closed)
c.awaitingMut.Lock()
for i, ch := range c.awaiting {
if ch != nil {
close(ch)
c.awaiting[i] = nil
}
}
c.writer.Close()
c.reader.Close()
c.awaitingMut.Unlock()
go c.receiver.Close(c.id, err)
}
})
}
func (c *rawConnection) idGenerator() {
@@ -554,8 +558,7 @@ func (c *rawConnection) pingerLoop() {
func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
c.send(header{0, msgID, messageTypeResponse},
encodableBytes(data))
c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data))
}
type Statistics struct {

View File

@@ -203,14 +203,26 @@ func getServiceURL(rootURL string) (string, error) {
}
u, _ := url.Parse(rootURL)
if svc.ControlURL[0] == '/' {
u.Path = svc.ControlURL
} else {
u.Path += svc.ControlURL
}
replaceRawPath(u, svc.ControlURL)
return u.String(), nil
}
func replaceRawPath(u *url.URL, rp string) {
var p, q string
fs := strings.Split(rp, "?")
p = fs[0]
if len(fs) > 1 {
q = fs[1]
}
if p[0] == '/' {
u.Path = p
} else {
u.Path += p
}
u.RawQuery = q
}
func soapRequest(url, function, message string) error {
tpl := `<?xml version="1.0" ?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">