mirror of
https://github.com/syncthing/syncthing.git
synced 2026-01-15 09:19:13 -05:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50b37f1366 | ||
|
|
a7b6e35467 | ||
|
|
37d83a4e2e | ||
|
|
a720f90a70 |
@@ -168,16 +168,21 @@ func (d *Discoverer) sendLocalAnnouncements() {
|
||||
}
|
||||
|
||||
func (d *Discoverer) sendExternalAnnouncements() {
|
||||
// this should go in the Discoverer struct
|
||||
errorRetryIntv := 60 * time.Second
|
||||
|
||||
remote, err := net.ResolveUDPAddr("udp", d.extServer)
|
||||
if err != nil {
|
||||
l.Warnf("Global discovery: %v; no external announcements", err)
|
||||
return
|
||||
for err != nil {
|
||||
l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv)
|
||||
time.Sleep(errorRetryIntv)
|
||||
remote, err = net.ResolveUDPAddr("udp", d.extServer)
|
||||
}
|
||||
|
||||
conn, err := net.ListenUDP("udp", nil)
|
||||
if err != nil {
|
||||
l.Warnf("Global discovery: %v; no external announcements", err)
|
||||
return
|
||||
for err != nil {
|
||||
l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv)
|
||||
time.Sleep(errorRetryIntv)
|
||||
conn, err = net.ListenUDP("udp", nil)
|
||||
}
|
||||
|
||||
var buf []byte
|
||||
@@ -198,7 +203,7 @@ func (d *Discoverer) sendExternalAnnouncements() {
|
||||
l.Debugf("discover: send announcement -> %v\n%s", remote, hex.Dump(buf))
|
||||
}
|
||||
|
||||
_, err = conn.WriteTo(buf, remote)
|
||||
_, err := conn.WriteTo(buf, remote)
|
||||
if err != nil {
|
||||
if debug {
|
||||
l.Debugln("discover: warning:", err)
|
||||
@@ -222,7 +227,7 @@ func (d *Discoverer) sendExternalAnnouncements() {
|
||||
if ok {
|
||||
time.Sleep(d.globalBcastIntv)
|
||||
} else {
|
||||
time.Sleep(60 * time.Second)
|
||||
time.Sleep(errorRetryIntv)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +49,6 @@ func (m *Set) Replace(id uint, fs []scanner.File) {
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
log("Replace", id, len(fs))
|
||||
if len(fs) == 0 || !m.equals(id, fs) {
|
||||
m.changes[id]++
|
||||
m.replace(id, fs)
|
||||
@@ -66,7 +65,6 @@ func (m *Set) ReplaceWithDelete(id uint, fs []scanner.File) {
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
log("ReplaceWithDelete", id, len(fs))
|
||||
if len(fs) == 0 || !m.equals(id, fs) {
|
||||
m.changes[id]++
|
||||
|
||||
@@ -104,9 +102,7 @@ func (m *Set) Update(id uint, fs []scanner.File) {
|
||||
if debug {
|
||||
l.Debugf("Update(%d, [%d])", id, len(fs))
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
log("Update", id, len(fs))
|
||||
m.update(id, fs)
|
||||
m.changes[id]++
|
||||
m.Unlock()
|
||||
@@ -224,7 +220,6 @@ func (m *Set) equals(id uint, fs []scanner.File) bool {
|
||||
func (m *Set) update(cid uint, fs []scanner.File) {
|
||||
remFiles := m.remoteKey[cid]
|
||||
if remFiles == nil {
|
||||
printLog()
|
||||
l.Fatalln("update before replace for cid", cid)
|
||||
}
|
||||
for _, f := range fs {
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
package files
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/calmh/syncthing/cid"
|
||||
)
|
||||
|
||||
type logEntry struct {
|
||||
time time.Time
|
||||
method string
|
||||
cid uint
|
||||
node string
|
||||
nfiles int
|
||||
}
|
||||
|
||||
func (l logEntry) String() string {
|
||||
return fmt.Sprintf("%v: %s cid:%d node:%s nfiles:%d", l.time, l.method, l.cid, l.node, l.nfiles)
|
||||
}
|
||||
|
||||
var (
|
||||
debugLog [10]logEntry
|
||||
debugNext int
|
||||
cm *cid.Map
|
||||
)
|
||||
|
||||
func SetCM(m *cid.Map) {
|
||||
cm = m
|
||||
}
|
||||
|
||||
func log(method string, id uint, nfiles int) {
|
||||
e := logEntry{
|
||||
time: time.Now(),
|
||||
method: method,
|
||||
cid: id,
|
||||
nfiles: nfiles,
|
||||
}
|
||||
if cm != nil {
|
||||
e.node = cm.Name(id)
|
||||
}
|
||||
debugLog[debugNext] = e
|
||||
debugNext = (debugNext + 1) % len(debugLog)
|
||||
}
|
||||
|
||||
func printLog() {
|
||||
l.Debugln("--- Consistency error ---")
|
||||
for _, e := range debugLog {
|
||||
l.Debugln(e)
|
||||
}
|
||||
}
|
||||
@@ -98,9 +98,6 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers
|
||||
sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
|
||||
}
|
||||
|
||||
// TEMP: #344
|
||||
files.SetCM(m.cm)
|
||||
|
||||
var timeout = 20 * 60 // seconds
|
||||
if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
|
||||
it, err := strconv.Atoi(t)
|
||||
|
||||
@@ -97,6 +97,15 @@ type rawConnection struct {
|
||||
outbox chan []encodable
|
||||
closed chan struct{}
|
||||
once sync.Once
|
||||
|
||||
incomingIndexes chan incomingIndex
|
||||
}
|
||||
|
||||
type incomingIndex struct {
|
||||
update bool
|
||||
id string
|
||||
repo string
|
||||
files []FileInfo
|
||||
}
|
||||
|
||||
type asyncResult struct {
|
||||
@@ -121,21 +130,22 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
|
||||
wb := bufio.NewWriter(flwr)
|
||||
|
||||
c := rawConnection{
|
||||
id: nodeID,
|
||||
receiver: nativeModel{receiver},
|
||||
state: stateInitial,
|
||||
reader: flrd,
|
||||
cr: cr,
|
||||
xr: xdr.NewReader(flrd),
|
||||
writer: flwr,
|
||||
cw: cw,
|
||||
wb: wb,
|
||||
xw: xdr.NewWriter(wb),
|
||||
awaiting: make([]chan asyncResult, 0x1000),
|
||||
idxSent: make(map[string]map[string]uint64),
|
||||
outbox: make(chan []encodable),
|
||||
nextID: make(chan int),
|
||||
closed: make(chan struct{}),
|
||||
id: nodeID,
|
||||
receiver: nativeModel{receiver},
|
||||
state: stateInitial,
|
||||
reader: flrd,
|
||||
cr: cr,
|
||||
xr: xdr.NewReader(flrd),
|
||||
writer: flwr,
|
||||
cw: cw,
|
||||
wb: wb,
|
||||
xw: xdr.NewWriter(wb),
|
||||
awaiting: make([]chan asyncResult, 0x1000),
|
||||
idxSent: make(map[string]map[string]uint64),
|
||||
outbox: make(chan []encodable),
|
||||
nextID: make(chan int),
|
||||
closed: make(chan struct{}),
|
||||
incomingIndexes: make(chan incomingIndex, 100), // should be enough for anyone, right?
|
||||
}
|
||||
|
||||
go c.indexSerializerLoop()
|
||||
@@ -316,15 +326,6 @@ func (c *rawConnection) readerLoop() (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
type incomingIndex struct {
|
||||
update bool
|
||||
id string
|
||||
repo string
|
||||
files []FileInfo
|
||||
}
|
||||
|
||||
var incomingIndexes = make(chan incomingIndex, 100) // should be enough for anyone, right?
|
||||
|
||||
func (c *rawConnection) indexSerializerLoop() {
|
||||
// We must avoid blocking the reader loop when processing large indexes.
|
||||
// There is otherwise a potential deadlock where both sides has the model
|
||||
@@ -332,11 +333,16 @@ func (c *rawConnection) indexSerializerLoop() {
|
||||
// large index update from the other side. But we must also ensure to
|
||||
// process the indexes in the order they are received, hence the separate
|
||||
// routine and buffered channel.
|
||||
for ii := range incomingIndexes {
|
||||
if ii.update {
|
||||
c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
|
||||
} else {
|
||||
c.receiver.Index(ii.id, ii.repo, ii.files)
|
||||
for {
|
||||
select {
|
||||
case ii := <-c.incomingIndexes:
|
||||
if ii.update {
|
||||
c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
|
||||
} else {
|
||||
c.receiver.Index(ii.id, ii.repo, ii.files)
|
||||
}
|
||||
case <-c.closed:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -355,7 +361,7 @@ func (c *rawConnection) handleIndex() error {
|
||||
// update and can't receive the large index update from the
|
||||
// other side.
|
||||
|
||||
incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files}
|
||||
c.incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -366,7 +372,7 @@ func (c *rawConnection) handleIndexUpdate() error {
|
||||
if err := c.xr.Error(); err != nil {
|
||||
return err
|
||||
} else {
|
||||
incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files}
|
||||
c.incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -450,13 +456,18 @@ func (c *rawConnection) send(h header, es ...encodable) bool {
|
||||
|
||||
func (c *rawConnection) writerLoop() {
|
||||
var err error
|
||||
for es := range c.outbox {
|
||||
for _, e := range es {
|
||||
e.encodeXDR(c.xw)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case es := <-c.outbox:
|
||||
for _, e := range es {
|
||||
e.encodeXDR(c.xw)
|
||||
}
|
||||
|
||||
if err = c.flush(); err != nil {
|
||||
c.close(err)
|
||||
if err = c.flush(); err != nil {
|
||||
c.close(err)
|
||||
return
|
||||
}
|
||||
case <-c.closed:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user