mirror of
https://github.com/syncthing/syncthing.git
synced 2025-12-24 06:28:10 -05:00
Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c549e413a2 | ||
|
|
63a05ff6fa | ||
|
|
89a5aac6ea | ||
|
|
232d715c37 | ||
|
|
1c4e710adc | ||
|
|
7fdea0dd93 | ||
|
|
5b84b72d15 | ||
|
|
7e0be89052 | ||
|
|
632bcae856 | ||
|
|
fd56123acf | ||
|
|
a2a2e1d466 | ||
|
|
d4c5786a14 | ||
|
|
42ad9f8b02 | ||
|
|
0f6b34160c | ||
|
|
7e3b29e3e0 | ||
|
|
2f660aff7a | ||
|
|
af3e64a5a7 | ||
|
|
9560265adc | ||
|
|
4097528aa2 | ||
|
|
71d50a50f4 | ||
|
|
ec0489a8ea | ||
|
|
7948d046d1 | ||
|
|
223bdbb9aa |
10
README.md
10
README.md
@@ -115,8 +115,8 @@ Run syncthing to let it create it's config directory and certificate:
|
||||
```
|
||||
$ syncthing
|
||||
11:34:13 main.go:85: INFO: Version v0.1-40-gbb0fd87
|
||||
11:34:13 tls.go:61: OK: wrote cert.pem
|
||||
11:34:13 tls.go:67: OK: wrote key.pem
|
||||
11:34:13 tls.go:61: OK: Created TLS certificate file
|
||||
11:34:13 tls.go:67: OK: Created TLS key file
|
||||
11:34:13 main.go:66: INFO: My ID: NCTBZAAHXR6ZZP3D7SL3DLYFFQERMW4Q
|
||||
11:34:13 main.go:90: FATAL: No config file
|
||||
```
|
||||
@@ -159,8 +159,12 @@ $ syncthing --ro
|
||||
13:30:59 main.go:247: INFO: Starting local discovery
|
||||
13:30:59 main.go:165: OK: Ready to synchronize
|
||||
13:31:04 discover.go:113: INFO: Discovered node CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL at 172.16.32.24:22000
|
||||
13:31:14 main.go:296: OK: Connected to node CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL
|
||||
13:31:14 main.go:296: INFO: Connected to node CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL
|
||||
13:31:19 main.go:345: INFO: Transferred 139 KiB in (14 KiB/s), 139 KiB out (14 KiB/s)
|
||||
13:32:20 model.go:94: INFO: CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL: 263.4 KB/s in, 69.1 KB/s out
|
||||
13:32:20 model.go:104: INFO: 18289 files, 24.24 GB in cluster
|
||||
13:32:20 model.go:111: INFO: 17132 files, 22.39 GB in local repo
|
||||
13:32:20 model.go:117: INFO: 1157 files, 1.84 GB to synchronize
|
||||
...
|
||||
```
|
||||
You should see the synchronization start and then finish a short while
|
||||
|
||||
20
build.sh
20
build.sh
@@ -18,8 +18,22 @@ for goos in darwin linux freebsd ; do
|
||||
&& cp syncthing "build/$name" \
|
||||
&& cp README.md LICENSE "$name" \
|
||||
&& mv syncthing "$name" \
|
||||
&& tar zcf "$name.tar.gz" "$name" \
|
||||
&& rm -r "$name" \
|
||||
&& mv "$name.tar.gz" build
|
||||
&& tar zcf "build/$name.tar.gz" "$name" \
|
||||
&& rm -r "$name"
|
||||
done
|
||||
done
|
||||
|
||||
for goos in windows ; do
|
||||
for goarch in amd64 386 ; do
|
||||
echo "$goos-$goarch"
|
||||
export GOOS="$goos"
|
||||
export GOARCH="$goarch"
|
||||
export name="syncthing-$goos-$goarch"
|
||||
go build -ldflags "-X main.Version $version" \
|
||||
&& mkdir -p "$name" \
|
||||
&& cp syncthing.exe "build/$name.exe" \
|
||||
&& cp README.md LICENSE "$name" \
|
||||
&& zip -qr "build/$name.zip" "$name" \
|
||||
&& rm -r "$name"
|
||||
done
|
||||
done
|
||||
|
||||
@@ -238,7 +238,7 @@ func (d *Discoverer) externalLookup(node string) (string, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
var buf = buffers.Get(1024)
|
||||
var buf = buffers.Get(256)
|
||||
defer buffers.Put(buf)
|
||||
|
||||
n, err := conn.Read(buf)
|
||||
|
||||
15
logger.go
15
logger.go
@@ -6,21 +6,16 @@ import (
|
||||
"os"
|
||||
)
|
||||
|
||||
var debugEnabled = true
|
||||
var logger = log.New(os.Stderr, "", log.Lshortfile|log.Ltime)
|
||||
var logger = log.New(os.Stderr, "", log.Ltime)
|
||||
|
||||
func debugln(vals ...interface{}) {
|
||||
if debugEnabled {
|
||||
s := fmt.Sprintln(vals...)
|
||||
logger.Output(2, "DEBUG: "+s)
|
||||
}
|
||||
s := fmt.Sprintln(vals...)
|
||||
logger.Output(2, "DEBUG: "+s)
|
||||
}
|
||||
|
||||
func debugf(format string, vals ...interface{}) {
|
||||
if debugEnabled {
|
||||
s := fmt.Sprintf(format, vals...)
|
||||
logger.Output(2, "DEBUG: "+s)
|
||||
}
|
||||
s := fmt.Sprintf(format, vals...)
|
||||
logger.Output(2, "DEBUG: "+s)
|
||||
}
|
||||
|
||||
func infoln(vals ...interface{}) {
|
||||
|
||||
67
main.go
67
main.go
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"crypto/sha1"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
@@ -25,6 +26,7 @@ type Options struct {
|
||||
Listen string `short:"l" long:"listen" description:"Listen address" default:":22000" value-name:"ADDR"`
|
||||
ReadOnly bool `short:"r" long:"ro" description:"Repository is read only"`
|
||||
Delete bool `short:"d" long:"delete" description:"Delete files deleted from cluster"`
|
||||
Rehash bool `long:"rehash" description:"Ignore cache and rehash all files in repository"`
|
||||
NoSymlinks bool `long:"no-symlinks" description:"Don't follow first level symlinks in the repo"`
|
||||
Discovery DiscoveryOptions `group:"Discovery Options"`
|
||||
Advanced AdvancedOptions `group:"Advanced Options"`
|
||||
@@ -32,6 +34,7 @@ type Options struct {
|
||||
}
|
||||
|
||||
type DebugOptions struct {
|
||||
LogSource bool `long:"log-source"`
|
||||
TraceFile bool `long:"trace-file"`
|
||||
TraceNet bool `long:"trace-net"`
|
||||
TraceIdx bool `long:"trace-idx"`
|
||||
@@ -46,8 +49,8 @@ type DiscoveryOptions struct {
|
||||
}
|
||||
|
||||
type AdvancedOptions struct {
|
||||
RequestsInFlight int `long:"reqs-in-flight" description:"Parallell in flight requests per file" default:"8" value-name:"REQS"`
|
||||
FilesInFlight int `long:"files-in-flight" description:"Parallell in flight file pulls" default:"4" value-name:"FILES"`
|
||||
RequestsInFlight int `long:"reqs-in-flight" description:"Parallell in flight requests per file" default:"4" value-name:"REQS"`
|
||||
FilesInFlight int `long:"files-in-flight" description:"Parallell in flight file pulls" default:"8" value-name:"FILES"`
|
||||
ScanInterval time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"`
|
||||
ConnInterval time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"`
|
||||
}
|
||||
@@ -65,19 +68,14 @@ var (
|
||||
nodeAddrs = make(map[string][]string)
|
||||
)
|
||||
|
||||
// Options
|
||||
var (
|
||||
ConfDir = path.Join(getHomeDir(), confDirName)
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Useful for debugging; to be adjusted.
|
||||
log.SetFlags(log.Ltime | log.Lshortfile)
|
||||
|
||||
_, err := flags.Parse(&opts)
|
||||
if err != nil {
|
||||
os.Exit(0)
|
||||
}
|
||||
if opts.Debug.TraceFile || opts.Debug.TraceIdx || opts.Debug.TraceNet || opts.Debug.LogSource {
|
||||
logger = log.New(os.Stderr, "", log.Lshortfile|log.Ldate|log.Ltime|log.Lmicroseconds)
|
||||
}
|
||||
if strings.HasPrefix(opts.ConfDir, "~/") {
|
||||
opts.ConfDir = strings.Replace(opts.ConfDir, "~", getHomeDir(), 1)
|
||||
}
|
||||
@@ -86,11 +84,11 @@ func main() {
|
||||
|
||||
// Ensure that our home directory exists and that we have a certificate and key.
|
||||
|
||||
ensureDir(ConfDir, 0700)
|
||||
cert, err := loadCert(ConfDir)
|
||||
ensureDir(opts.ConfDir, 0700)
|
||||
cert, err := loadCert(opts.ConfDir)
|
||||
if err != nil {
|
||||
newCertificate(ConfDir)
|
||||
cert, err = loadCert(ConfDir)
|
||||
newCertificate(opts.ConfDir)
|
||||
cert, err = loadCert(opts.ConfDir)
|
||||
fatalErr(err)
|
||||
}
|
||||
|
||||
@@ -119,7 +117,7 @@ func main() {
|
||||
|
||||
// Load the configuration file, if it exists.
|
||||
|
||||
cf, err := os.Open(path.Join(ConfDir, confFileName))
|
||||
cf, err := os.Open(path.Join(opts.ConfDir, confFileName))
|
||||
if err != nil {
|
||||
fatalln("No config file")
|
||||
config = ini.Config{}
|
||||
@@ -143,8 +141,11 @@ func main() {
|
||||
// Walk the repository and update the local model before establishing any
|
||||
// connections to other nodes.
|
||||
|
||||
infoln("Initial repository scan in progress")
|
||||
loadIndex(m)
|
||||
if !opts.Rehash {
|
||||
infoln("Loading index cache")
|
||||
loadIndex(m)
|
||||
}
|
||||
infoln("Populating repository index")
|
||||
updateLocalModel(m)
|
||||
|
||||
// Routine to listen for incoming connections
|
||||
@@ -214,9 +215,7 @@ listen:
|
||||
|
||||
for nodeID := range nodeAddrs {
|
||||
if nodeID == remoteID {
|
||||
nc := protocol.NewConnection(remoteID, conn, conn, m)
|
||||
m.AddNode(nc)
|
||||
okln("Connected to nodeID", remoteID, "(in)")
|
||||
m.AddConnection(conn, remoteID)
|
||||
continue listen
|
||||
}
|
||||
}
|
||||
@@ -285,9 +284,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model,
|
||||
continue
|
||||
}
|
||||
|
||||
nc := protocol.NewConnection(nodeID, conn, conn, m)
|
||||
m.AddNode(nc)
|
||||
okln("Connected to node", remoteID, "(out)")
|
||||
m.AddConnection(conn, remoteID)
|
||||
continue nextNode
|
||||
}
|
||||
}
|
||||
@@ -303,24 +300,36 @@ func updateLocalModel(m *Model) {
|
||||
}
|
||||
|
||||
func saveIndex(m *Model) {
|
||||
fname := fmt.Sprintf("%x.idx", sha1.Sum([]byte(m.Dir())))
|
||||
idxf, err := os.Create(path.Join(ConfDir, fname))
|
||||
name := fmt.Sprintf("%x.idx.gz", sha1.Sum([]byte(m.Dir())))
|
||||
fullName := path.Join(opts.ConfDir, name)
|
||||
idxf, err := os.Create(fullName + ".tmp")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
protocol.WriteIndex(idxf, m.ProtocolIndex())
|
||||
|
||||
gzw := gzip.NewWriter(idxf)
|
||||
|
||||
protocol.WriteIndex(gzw, m.ProtocolIndex())
|
||||
gzw.Close()
|
||||
idxf.Close()
|
||||
os.Rename(fullName+".tmp", fullName)
|
||||
}
|
||||
|
||||
func loadIndex(m *Model) {
|
||||
fname := fmt.Sprintf("%x.idx", sha1.Sum([]byte(m.Dir())))
|
||||
idxf, err := os.Open(path.Join(ConfDir, fname))
|
||||
fname := fmt.Sprintf("%x.idx.gz", sha1.Sum([]byte(m.Dir())))
|
||||
idxf, err := os.Open(path.Join(opts.ConfDir, fname))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer idxf.Close()
|
||||
|
||||
idx, err := protocol.ReadIndex(idxf)
|
||||
gzr, err := gzip.NewReader(idxf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer gzr.Close()
|
||||
|
||||
idx, err := protocol.ReadIndex(gzr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
62
model.go
62
model.go
@@ -12,8 +12,8 @@ acquire locks, but document what locks they require.
|
||||
*/
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
@@ -27,11 +27,12 @@ type Model struct {
|
||||
sync.RWMutex
|
||||
dir string
|
||||
|
||||
global map[string]File // the latest version of each file as it exists in the cluster
|
||||
local map[string]File // the files we currently have locally on disk
|
||||
remote map[string]map[string]File
|
||||
need map[string]bool // the files we need to update
|
||||
nodes map[string]*protocol.Connection
|
||||
global map[string]File // the latest version of each file as it exists in the cluster
|
||||
local map[string]File // the files we currently have locally on disk
|
||||
remote map[string]map[string]File
|
||||
need map[string]bool // the files we need to update
|
||||
nodes map[string]*protocol.Connection
|
||||
rawConn map[string]io.ReadWriteCloser
|
||||
|
||||
updatedLocal int64 // timestamp of last update to local
|
||||
updateGlobal int64 // timestamp of last update to remote
|
||||
@@ -40,10 +41,6 @@ type Model struct {
|
||||
lastIdxBcastRequest time.Time
|
||||
}
|
||||
|
||||
var (
|
||||
errNoSuchNode = errors.New("no such node")
|
||||
)
|
||||
|
||||
const (
|
||||
FlagDeleted = 1 << 12
|
||||
|
||||
@@ -59,6 +56,7 @@ func NewModel(dir string) *Model {
|
||||
remote: make(map[string]map[string]File),
|
||||
need: make(map[string]bool),
|
||||
nodes: make(map[string]*protocol.Connection),
|
||||
rawConn: make(map[string]io.ReadWriteCloser),
|
||||
lastIdxBcast: time.Now(),
|
||||
}
|
||||
|
||||
@@ -88,9 +86,7 @@ func (m *Model) printStatsLoop() {
|
||||
func (m *Model) printConnectionStats() {
|
||||
for node, conn := range m.nodes {
|
||||
stats := conn.Statistics()
|
||||
if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 {
|
||||
infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000)
|
||||
} else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
|
||||
if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
|
||||
infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec))
|
||||
}
|
||||
}
|
||||
@@ -195,16 +191,26 @@ func (m *Model) SeedIndex(fs []protocol.FileInfo) {
|
||||
m.printModelStats()
|
||||
}
|
||||
|
||||
func (m *Model) Close(node string) {
|
||||
func (m *Model) Close(node string, err error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if opts.Debug.TraceNet {
|
||||
debugf("NET CLOSE: %s", node)
|
||||
conn, ok := m.rawConn[node]
|
||||
if ok {
|
||||
conn.Close()
|
||||
} else {
|
||||
warnln("Close on unknown connection for node", node)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
warnf("Disconnected from node %s: %v", node, err)
|
||||
} else {
|
||||
infoln("Disconnected from node", node)
|
||||
}
|
||||
|
||||
delete(m.remote, node)
|
||||
delete(m.nodes, node)
|
||||
delete(m.rawConn, node)
|
||||
|
||||
m.recomputeGlobal()
|
||||
m.recomputeNeed()
|
||||
@@ -235,7 +241,7 @@ func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, h
|
||||
nc, ok := m.nodes[nodeID]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
return nil, errNoSuchNode
|
||||
return nil, fmt.Errorf("RequestGlobal: no such node: %s", nodeID)
|
||||
}
|
||||
|
||||
if opts.Debug.TraceNet {
|
||||
@@ -465,21 +471,24 @@ func (m *Model) protocolIndex() []protocol.FileInfo {
|
||||
return index
|
||||
}
|
||||
|
||||
func (m *Model) AddNode(node *protocol.Connection) {
|
||||
func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
|
||||
node := protocol.NewConnection(nodeID, conn, conn, m)
|
||||
|
||||
m.Lock()
|
||||
m.nodes[node.ID] = node
|
||||
m.nodes[nodeID] = node
|
||||
m.rawConn[nodeID] = conn
|
||||
m.Unlock()
|
||||
|
||||
infoln("Connected to node", nodeID)
|
||||
|
||||
m.RLock()
|
||||
idx := m.protocolIndex()
|
||||
m.RUnlock()
|
||||
|
||||
if opts.Debug.TraceNet {
|
||||
debugf("NET IDX(out/add): %s: %d files", node.ID, len(idx))
|
||||
}
|
||||
|
||||
// Sending the index might take a while if we have many files and a slow
|
||||
// uplink. Return from AddNode in the meantime.
|
||||
go node.Index(idx)
|
||||
go func() {
|
||||
node.Index(idx)
|
||||
infoln("Sent initial index to node", nodeID)
|
||||
}()
|
||||
}
|
||||
|
||||
func fileFromFileInfo(f protocol.FileInfo) File {
|
||||
@@ -491,7 +500,6 @@ func fileFromFileInfo(f protocol.FileInfo) File {
|
||||
Length: b.Length,
|
||||
Hash: b.Hash,
|
||||
})
|
||||
buffers.Put(b.Hash)
|
||||
offset += uint64(b.Length)
|
||||
}
|
||||
return File{
|
||||
|
||||
@@ -30,8 +30,13 @@ func (m *Model) pullFile(name string) error {
|
||||
m.RLock()
|
||||
var localFile = m.local[name]
|
||||
var globalFile = m.global[name]
|
||||
var nodeIDs = m.whoHas(name)
|
||||
m.RUnlock()
|
||||
|
||||
if len(nodeIDs) == 0 {
|
||||
return fmt.Errorf("%s: no connected nodes with file available", name)
|
||||
}
|
||||
|
||||
filename := path.Join(m.dir, name)
|
||||
sdir := path.Dir(filename)
|
||||
|
||||
@@ -45,13 +50,13 @@ func (m *Model) pullFile(name string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tmpFile.Close()
|
||||
|
||||
contentChan := make(chan content, 32)
|
||||
var applyDone sync.WaitGroup
|
||||
applyDone.Add(1)
|
||||
go func() {
|
||||
applyContent(contentChan, tmpFile)
|
||||
tmpFile.Close()
|
||||
applyDone.Done()
|
||||
}()
|
||||
|
||||
@@ -77,9 +82,6 @@ func (m *Model) pullFile(name string) error {
|
||||
|
||||
// N remote copy routines
|
||||
|
||||
m.RLock()
|
||||
var nodeIDs = m.whoHas(name)
|
||||
m.RUnlock()
|
||||
var remoteBlocks = blockIterator{blocks: remote}
|
||||
for i := 0; i < opts.Advanced.RequestsInFlight; i++ {
|
||||
curNode := nodeIDs[i%len(nodeIDs)]
|
||||
@@ -110,7 +112,7 @@ func (m *Model) pullFile(name string) error {
|
||||
|
||||
err = hashCheck(tmpFilename, globalFile.Blocks)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %s", path.Base(name), err.Error())
|
||||
return fmt.Errorf("%s: %s (deleting)", path.Base(name), err.Error())
|
||||
}
|
||||
|
||||
err = os.Chtimes(tmpFilename, time.Unix(globalFile.Modified, 0), time.Unix(globalFile.Modified, 0))
|
||||
@@ -127,7 +129,6 @@ func (m *Model) pullFile(name string) error {
|
||||
}
|
||||
|
||||
func (m *Model) puller() {
|
||||
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
|
||||
@@ -143,12 +144,15 @@ func (m *Model) puller() {
|
||||
}
|
||||
|
||||
var limiter = make(chan bool, opts.Advanced.FilesInFlight)
|
||||
var allDone sync.WaitGroup
|
||||
|
||||
for _, n := range ns {
|
||||
limiter <- true
|
||||
allDone.Add(1)
|
||||
|
||||
go func(n string) {
|
||||
defer func() {
|
||||
allDone.Done()
|
||||
<-limiter
|
||||
}()
|
||||
|
||||
@@ -177,6 +181,8 @@ func (m *Model) puller() {
|
||||
}
|
||||
}(n)
|
||||
}
|
||||
|
||||
allDone.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,10 +196,10 @@ func applyContent(cc <-chan content, dst io.WriterAt) error {
|
||||
|
||||
for c := range cc {
|
||||
_, err = dst.WriteAt(c.data, c.offset)
|
||||
buffers.Put(c.data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
buffers.Put(c.data)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -294,7 +294,7 @@ func TestForgetNode(t *testing.T) {
|
||||
t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2)
|
||||
}
|
||||
|
||||
m.Close("42")
|
||||
m.Close("42", nil)
|
||||
|
||||
if l1, l2 := len(m.local), len(fs); l1 != l2 {
|
||||
t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2)
|
||||
|
||||
@@ -25,7 +25,7 @@ func (t *TestModel) Request(nodeID, name string, offset uint64, size uint32, has
|
||||
return t.data, nil
|
||||
}
|
||||
|
||||
func (t *TestModel) Close(nodeID string) {
|
||||
func (t *TestModel) Close(nodeID string, err error) {
|
||||
t.closed = true
|
||||
}
|
||||
|
||||
|
||||
@@ -3,8 +3,8 @@ package protocol
|
||||
import (
|
||||
"compress/flate"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -40,29 +40,28 @@ type Model interface {
|
||||
// A request was made by the peer node
|
||||
Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error)
|
||||
// The peer node closed the connection
|
||||
Close(nodeID string)
|
||||
Close(nodeID string, err error)
|
||||
}
|
||||
|
||||
type Connection struct {
|
||||
sync.RWMutex
|
||||
|
||||
ID string
|
||||
receiver Model
|
||||
reader io.Reader
|
||||
mreader *marshalReader
|
||||
writer io.Writer
|
||||
mwriter *marshalWriter
|
||||
closed bool
|
||||
awaiting map[int]chan asyncResult
|
||||
nextId int
|
||||
peerLatency time.Duration
|
||||
indexSent map[string]int64
|
||||
ID string
|
||||
receiver Model
|
||||
reader io.Reader
|
||||
mreader *marshalReader
|
||||
writer io.Writer
|
||||
mwriter *marshalWriter
|
||||
closed bool
|
||||
awaiting map[int]chan asyncResult
|
||||
nextId int
|
||||
indexSent map[string]int64
|
||||
|
||||
hasSentIndex bool
|
||||
hasRecvdIndex bool
|
||||
|
||||
lastStatistics Statistics
|
||||
statisticsLock sync.Mutex
|
||||
|
||||
lastReceive time.Time
|
||||
lastReceiveLock sync.RWMutex
|
||||
}
|
||||
|
||||
var ErrClosed = errors.New("Connection closed")
|
||||
@@ -72,8 +71,10 @@ type asyncResult struct {
|
||||
err error
|
||||
}
|
||||
|
||||
const pingTimeout = 30 * time.Second
|
||||
const pingIdleTime = 5 * time.Minute
|
||||
const (
|
||||
pingTimeout = 2 * time.Minute
|
||||
pingIdleTime = 5 * time.Minute
|
||||
)
|
||||
|
||||
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection {
|
||||
flrd := flate.NewReader(reader)
|
||||
@@ -89,7 +90,6 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
|
||||
writer: flwr,
|
||||
mwriter: &marshalWriter{w: flwr},
|
||||
awaiting: make(map[int]chan asyncResult),
|
||||
lastReceive: time.Now(),
|
||||
ID: nodeID,
|
||||
lastStatistics: Statistics{At: time.Now()},
|
||||
}
|
||||
@@ -103,7 +103,6 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
|
||||
// Index writes the list of file information to the connected peer node
|
||||
func (c *Connection) Index(idx []FileInfo) {
|
||||
c.Lock()
|
||||
|
||||
var msgType int
|
||||
if c.indexSent == nil {
|
||||
// This is the first time we send an index.
|
||||
@@ -130,32 +129,38 @@ func (c *Connection) Index(idx []FileInfo) {
|
||||
c.mwriter.writeIndex(idx)
|
||||
err := c.flush()
|
||||
c.nextId = (c.nextId + 1) & 0xfff
|
||||
c.hasSentIndex = true
|
||||
c.Unlock()
|
||||
if err != nil || c.mwriter.err != nil {
|
||||
c.close()
|
||||
|
||||
if err != nil {
|
||||
c.Close(err)
|
||||
return
|
||||
} else if c.mwriter.err != nil {
|
||||
c.Close(c.mwriter.err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Request returns the bytes for the specified block after fetching them from the connected peer.
|
||||
func (c *Connection) Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
|
||||
if c.isClosed() {
|
||||
c.Lock()
|
||||
if c.closed {
|
||||
c.Unlock()
|
||||
return nil, ErrClosed
|
||||
}
|
||||
c.Lock()
|
||||
rc := make(chan asyncResult)
|
||||
c.awaiting[c.nextId] = rc
|
||||
c.mwriter.writeHeader(header{0, c.nextId, messageTypeRequest})
|
||||
c.mwriter.writeRequest(request{name, offset, size, hash})
|
||||
if c.mwriter.err != nil {
|
||||
c.Unlock()
|
||||
c.close()
|
||||
c.Close(c.mwriter.err)
|
||||
return nil, c.mwriter.err
|
||||
}
|
||||
err := c.flush()
|
||||
if err != nil {
|
||||
c.Unlock()
|
||||
c.close()
|
||||
c.Close(err)
|
||||
return nil, err
|
||||
}
|
||||
c.nextId = (c.nextId + 1) & 0xfff
|
||||
@@ -168,26 +173,30 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt
|
||||
return res.val, res.err
|
||||
}
|
||||
|
||||
func (c *Connection) Ping() (time.Duration, bool) {
|
||||
if c.isClosed() {
|
||||
return 0, false
|
||||
}
|
||||
func (c *Connection) Ping() bool {
|
||||
c.Lock()
|
||||
rc := make(chan asyncResult)
|
||||
if c.closed {
|
||||
c.Unlock()
|
||||
return false
|
||||
}
|
||||
rc := make(chan asyncResult, 1)
|
||||
c.awaiting[c.nextId] = rc
|
||||
t0 := time.Now()
|
||||
c.mwriter.writeHeader(header{0, c.nextId, messageTypePing})
|
||||
err := c.flush()
|
||||
if err != nil || c.mwriter.err != nil {
|
||||
if err != nil {
|
||||
c.Unlock()
|
||||
c.close()
|
||||
return 0, false
|
||||
c.Close(err)
|
||||
return false
|
||||
} else if c.mwriter.err != nil {
|
||||
c.Unlock()
|
||||
c.Close(c.mwriter.err)
|
||||
return false
|
||||
}
|
||||
c.nextId = (c.nextId + 1) & 0xfff
|
||||
c.Unlock()
|
||||
|
||||
_, ok := <-rc
|
||||
return time.Since(t0), ok
|
||||
res, ok := <-rc
|
||||
return ok && res.err == nil
|
||||
}
|
||||
|
||||
func (c *Connection) Stop() {
|
||||
@@ -204,7 +213,7 @@ func (c *Connection) flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Connection) close() {
|
||||
func (c *Connection) Close(err error) {
|
||||
c.Lock()
|
||||
if c.closed {
|
||||
c.Unlock()
|
||||
@@ -217,7 +226,7 @@ func (c *Connection) close() {
|
||||
c.awaiting = nil
|
||||
c.Unlock()
|
||||
|
||||
c.receiver.Close(c.ID)
|
||||
c.receiver.Close(c.ID, err)
|
||||
}
|
||||
|
||||
func (c *Connection) isClosed() bool {
|
||||
@@ -227,59 +236,63 @@ func (c *Connection) isClosed() bool {
|
||||
}
|
||||
|
||||
func (c *Connection) readerLoop() {
|
||||
for !c.isClosed() {
|
||||
loop:
|
||||
for {
|
||||
hdr := c.mreader.readHeader()
|
||||
if c.mreader.err != nil {
|
||||
c.close()
|
||||
break
|
||||
c.Close(c.mreader.err)
|
||||
break loop
|
||||
}
|
||||
if hdr.version != 0 {
|
||||
log.Printf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version)
|
||||
c.close()
|
||||
break
|
||||
c.Close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version))
|
||||
break loop
|
||||
}
|
||||
|
||||
c.lastReceiveLock.Lock()
|
||||
c.lastReceive = time.Now()
|
||||
c.lastReceiveLock.Unlock()
|
||||
|
||||
switch hdr.msgType {
|
||||
case messageTypeIndex:
|
||||
files := c.mreader.readIndex()
|
||||
if c.mreader.err != nil {
|
||||
c.close()
|
||||
c.Close(c.mreader.err)
|
||||
break loop
|
||||
} else {
|
||||
c.receiver.Index(c.ID, files)
|
||||
}
|
||||
c.Lock()
|
||||
c.hasRecvdIndex = true
|
||||
c.Unlock()
|
||||
|
||||
case messageTypeIndexUpdate:
|
||||
files := c.mreader.readIndex()
|
||||
if c.mreader.err != nil {
|
||||
c.close()
|
||||
c.Close(c.mreader.err)
|
||||
break loop
|
||||
} else {
|
||||
c.receiver.IndexUpdate(c.ID, files)
|
||||
}
|
||||
|
||||
case messageTypeRequest:
|
||||
c.processRequest(hdr.msgID)
|
||||
req := c.mreader.readRequest()
|
||||
if c.mreader.err != nil {
|
||||
c.Close(c.mreader.err)
|
||||
break loop
|
||||
}
|
||||
go c.processRequest(hdr.msgID, req)
|
||||
|
||||
case messageTypeResponse:
|
||||
data := c.mreader.readResponse()
|
||||
|
||||
if c.mreader.err != nil {
|
||||
c.close()
|
||||
c.Close(c.mreader.err)
|
||||
break loop
|
||||
} else {
|
||||
c.RLock()
|
||||
c.Lock()
|
||||
rc, ok := c.awaiting[hdr.msgID]
|
||||
c.RUnlock()
|
||||
delete(c.awaiting, hdr.msgID)
|
||||
c.Unlock()
|
||||
|
||||
if ok {
|
||||
rc <- asyncResult{data, c.mreader.err}
|
||||
close(rc)
|
||||
|
||||
c.Lock()
|
||||
delete(c.awaiting, hdr.msgID)
|
||||
c.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -288,8 +301,12 @@ func (c *Connection) readerLoop() {
|
||||
c.mwriter.writeUint32(encodeHeader(header{0, hdr.msgID, messageTypePong}))
|
||||
err := c.flush()
|
||||
c.Unlock()
|
||||
if err != nil || c.mwriter.err != nil {
|
||||
c.close()
|
||||
if err != nil {
|
||||
c.Close(err)
|
||||
break loop
|
||||
} else if c.mwriter.err != nil {
|
||||
c.Close(c.mwriter.err)
|
||||
break loop
|
||||
}
|
||||
|
||||
case messageTypePong:
|
||||
@@ -307,56 +324,51 @@ func (c *Connection) readerLoop() {
|
||||
}
|
||||
|
||||
default:
|
||||
log.Printf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType)
|
||||
c.close()
|
||||
c.Close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
|
||||
break loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) processRequest(msgID int) {
|
||||
req := c.mreader.readRequest()
|
||||
if c.mreader.err != nil {
|
||||
c.close()
|
||||
} else {
|
||||
go func() {
|
||||
data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash)
|
||||
c.Lock()
|
||||
c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
|
||||
c.mwriter.writeResponse(data)
|
||||
err := c.flush()
|
||||
c.Unlock()
|
||||
buffers.Put(data)
|
||||
if c.mwriter.err != nil || err != nil {
|
||||
c.close()
|
||||
}
|
||||
}()
|
||||
func (c *Connection) processRequest(msgID int, req request) {
|
||||
data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash)
|
||||
|
||||
c.Lock()
|
||||
c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
|
||||
c.mwriter.writeResponse(data)
|
||||
err := c.flush()
|
||||
c.Unlock()
|
||||
|
||||
buffers.Put(data)
|
||||
if err != nil {
|
||||
c.Close(err)
|
||||
} else if c.mwriter.err != nil {
|
||||
c.Close(c.mwriter.err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) pingerLoop() {
|
||||
var rc = make(chan time.Duration, 1)
|
||||
for !c.isClosed() {
|
||||
c.lastReceiveLock.RLock()
|
||||
lr := c.lastReceive
|
||||
c.lastReceiveLock.RUnlock()
|
||||
var rc = make(chan bool, 1)
|
||||
for {
|
||||
time.Sleep(pingIdleTime / 2)
|
||||
|
||||
if time.Since(lr) > pingIdleTime {
|
||||
c.RLock()
|
||||
ready := c.hasRecvdIndex && c.hasSentIndex
|
||||
c.RUnlock()
|
||||
|
||||
if ready {
|
||||
go func() {
|
||||
t, ok := c.Ping()
|
||||
if ok {
|
||||
rc <- t
|
||||
}
|
||||
rc <- c.Ping()
|
||||
}()
|
||||
select {
|
||||
case lat := <-rc:
|
||||
c.Lock()
|
||||
c.peerLatency = (c.peerLatency + lat) / 2
|
||||
c.Unlock()
|
||||
case ok := <-rc:
|
||||
if !ok {
|
||||
c.Close(fmt.Errorf("Ping failure"))
|
||||
}
|
||||
case <-time.After(pingTimeout):
|
||||
c.close()
|
||||
c.Close(fmt.Errorf("Ping timeout"))
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -366,7 +378,6 @@ type Statistics struct {
|
||||
InBytesPerSec int
|
||||
OutBytesTotal int
|
||||
OutBytesPerSec int
|
||||
Latency time.Duration
|
||||
}
|
||||
|
||||
func (c *Connection) Statistics() Statistics {
|
||||
@@ -382,7 +393,6 @@ func (c *Connection) Statistics() Statistics {
|
||||
InBytesPerSec: int(float64(rt-c.lastStatistics.InBytesTotal) / secs),
|
||||
OutBytesTotal: wt,
|
||||
OutBytesPerSec: int(float64(wt-c.lastStatistics.OutBytesTotal) / secs),
|
||||
Latency: c.peerLatency,
|
||||
}
|
||||
c.lastStatistics = stats
|
||||
return stats
|
||||
|
||||
@@ -46,10 +46,10 @@ func TestPing(t *testing.T) {
|
||||
c0 := NewConnection("c0", ar, bw, nil)
|
||||
c1 := NewConnection("c1", br, aw, nil)
|
||||
|
||||
if _, ok := c0.Ping(); !ok {
|
||||
if ok := c0.Ping(); !ok {
|
||||
t.Error("c0 ping failed")
|
||||
}
|
||||
if _, ok := c1.Ping(); !ok {
|
||||
if ok := c1.Ping(); !ok {
|
||||
t.Error("c1 ping failed")
|
||||
}
|
||||
}
|
||||
@@ -70,7 +70,7 @@ func TestPingErr(t *testing.T) {
|
||||
c0 := NewConnection("c0", ar, ebw, m0)
|
||||
NewConnection("c1", br, eaw, m1)
|
||||
|
||||
_, res := c0.Ping()
|
||||
res := c0.Ping()
|
||||
if (i < 4 || j < 4) && res {
|
||||
t.Errorf("Unexpected ping success; i=%d, j=%d", i, j)
|
||||
} else if (i >= 8 && j >= 8) && !res {
|
||||
@@ -190,7 +190,7 @@ func TestClose(t *testing.T) {
|
||||
c0 := NewConnection("c0", ar, bw, m0)
|
||||
NewConnection("c1", br, aw, m1)
|
||||
|
||||
c0.close()
|
||||
c0.Close(nil)
|
||||
|
||||
ok := c0.isClosed()
|
||||
if !ok {
|
||||
@@ -199,7 +199,7 @@ func TestClose(t *testing.T) {
|
||||
|
||||
// None of these should panic, some should return an error
|
||||
|
||||
_, ok = c0.Ping()
|
||||
ok = c0.Ping()
|
||||
if ok {
|
||||
t.Error("Ping should not return true")
|
||||
}
|
||||
|
||||
4
tls.go
4
tls.go
@@ -58,11 +58,11 @@ func newCertificate(dir string) {
|
||||
fatalErr(err)
|
||||
pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
|
||||
certOut.Close()
|
||||
okln("wrote cert.pem")
|
||||
okln("Created TLS certificate file")
|
||||
|
||||
keyOut, err := os.OpenFile(path.Join(dir, "key.pem"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
|
||||
fatalErr(err)
|
||||
pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(priv)})
|
||||
keyOut.Close()
|
||||
okln("wrote key.pem")
|
||||
okln("Created TLS key file")
|
||||
}
|
||||
|
||||
19
walk.go
19
walk.go
@@ -45,7 +45,8 @@ func tempName(name string, modified int64) string {
|
||||
func genWalker(base string, res *[]File, model *Model) filepath.WalkFunc {
|
||||
return func(p string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
warnln(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if isTempName(p) {
|
||||
@@ -55,12 +56,14 @@ func genWalker(base string, res *[]File, model *Model) filepath.WalkFunc {
|
||||
if info.Mode()&os.ModeType == 0 {
|
||||
rn, err := filepath.Rel(base, p)
|
||||
if err != nil {
|
||||
return err
|
||||
warnln(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
fi, err := os.Stat(p)
|
||||
if err != nil {
|
||||
return err
|
||||
warnln(err)
|
||||
return nil
|
||||
}
|
||||
modified := fi.ModTime().Unix()
|
||||
|
||||
@@ -74,13 +77,15 @@ func genWalker(base string, res *[]File, model *Model) filepath.WalkFunc {
|
||||
}
|
||||
fd, err := os.Open(p)
|
||||
if err != nil {
|
||||
return err
|
||||
warnln(err)
|
||||
return nil
|
||||
}
|
||||
defer fd.Close()
|
||||
|
||||
blocks, err := Blocks(fd, BlockSize)
|
||||
if err != nil {
|
||||
return err
|
||||
warnln(err)
|
||||
return nil
|
||||
}
|
||||
f := File{
|
||||
Name: rn,
|
||||
@@ -110,12 +115,14 @@ func Walk(dir string, model *Model, followSymlinks bool) []File {
|
||||
warnln(err)
|
||||
return files
|
||||
}
|
||||
defer d.Close()
|
||||
|
||||
fis, err := d.Readdir(-1)
|
||||
if err != nil {
|
||||
warnln(err)
|
||||
return files
|
||||
}
|
||||
d.Close()
|
||||
|
||||
for _, fi := range fis {
|
||||
if fi.Mode()&os.ModeSymlink != 0 {
|
||||
err := filepath.Walk(path.Join(dir, fi.Name())+"/", fn)
|
||||
|
||||
Reference in New Issue
Block a user