Compare commits

..

43 Commits

Author SHA1 Message Date
Jakob Borg
3ed783983f Don't build stcli by default 2014-05-11 15:40:14 -03:00
Jakob Borg
837f3a68ab Sort repos on directory (fixes #178) 2014-05-11 15:25:06 -03:00
Jakob Borg
59e45c5c68 Auto assign ports for GUI and BEP on initial startup (fixes #176) 2014-05-11 15:21:41 -03:00
Jakob Borg
94761d0472 Don't warn about legit requests for deleted files (fixes #173) 2014-05-11 14:54:26 -03:00
Jakob Borg
a91eb701bf Reload configuration after lost connection or restart. 2014-05-11 14:43:38 -03:00
Jakob Borg
1a1f118f1a Restructure protocol code with less locking 2014-05-11 14:30:29 -03:00
Jakob Borg
b115fca8a9 Increase ping timeout 2014-05-11 14:30:15 -03:00
Jakob Borg
dfd239ac06 Also build windows-386 and linux-armv5 2014-05-06 08:13:56 -03:00
Jakob Borg
2ae218d069 Rephrase Read Only into Repository Master (fixes #159) 2014-05-04 20:01:33 -03:00
Jakob Borg
1401d2ee9b Remove refereces to JS source mapping files 2014-05-04 18:27:09 +02:00
Jakob Borg
f39e105101 Stop repository if the directory disappears (fixes #154) 2014-05-04 18:22:59 +02:00
Jakob Borg
482795bab0 Streamline error handling and locking, with fix for close() race 2014-05-04 18:22:25 +02:00
Jakob Borg
10e8861f14 Smarter initial index sending 2014-05-04 18:21:57 +02:00
Jakob Borg
ecc6476308 Revert "Fix protocol close test"
This reverts commit 92c1ce57a6.
2014-05-04 08:16:45 +02:00
Jakob Borg
28e347002a Revert "Streamline error handling and locking" (fixes #172)
This reverts commit 116f232f5a.
2014-05-04 08:11:06 +02:00
Jakob Borg
b3d19bd5cc Run vet when building 2014-05-02 21:59:36 +02:00
Jakob Borg
647165ab89 Remove dead code 2014-05-02 21:59:18 +02:00
Jakob Borg
6807d9bd4c Fix upgrade non-support on Windows 2014-05-02 20:19:21 +02:00
Jakob Borg
699ecc7140 Some places should use RLock instead of Lock (ref #169) 2014-05-02 17:15:04 +02:00
Jakob Borg
b374ec9355 Save temporary in correct dir during upgrade 2014-05-02 17:04:45 +02:00
Jakob Borg
9659d021cb Merge pull request #170 from andrew-d/patch-1
Fix typo in header name
2014-05-02 16:59:08 +02:00
Jakob Borg
a4ad9eb134 Add andrew-d 2014-05-02 16:58:55 +02:00
Andrew
a455258a62 Fix typo in header name 2014-05-02 01:26:12 -07:00
Jakob Borg
0ae342673a Update saved dependencies 2014-05-02 10:05:48 +02:00
Jakob Borg
33d75a264d Built in upgrade functionality 2014-05-02 10:01:09 +02:00
Jakob Borg
89dc5bb951 Windows doesn't have SysProcAttr 2014-05-02 08:57:34 +02:00
Jakob Borg
45403917de Minor cleanup in discovery 2014-05-02 08:53:19 +02:00
Jakob Borg
ed476271a6 Start xdg-open in new process group (fixes #164) 2014-05-02 08:53:05 +02:00
Jakob Borg
1e92c47960 Don't bother starting without GUI (fixes #156) 2014-04-30 22:52:38 +02:00
Jakob Borg
4f2fe07ae4 Show node ID in regular text not disabled control (fixes #162) 2014-04-30 22:42:39 +02:00
Jakob Borg
aff3cd01c5 Don't show Offline badge when global disco is disabled (fixes #167) 2014-04-30 22:17:43 +02:00
Jakob Borg
ac74ee1468 Don't redirect to absolute URL (fixes #166) 2014-04-30 22:10:13 +02:00
Jakob Borg
0d55cf4be5 Don't use absolute URL for rest calls (fixes #166) 2014-04-30 22:02:34 +02:00
Jakob Borg
5399a25532 Getting started 2014-04-30 16:13:29 +02:00
Jakob Borg
ae882c93c9 Links to discourse 2014-04-30 15:14:42 +02:00
Jakob Borg
f398ca77c1 Better trace output from mc 2014-04-30 15:13:54 +02:00
Jakob Borg
dcd7d278aa Handle and indicate duplicate repo ID:s (fixes #153) 2014-04-27 21:53:27 +02:00
Jakob Borg
89f5f3bf9a Fix small data races 2014-04-27 21:33:57 +02:00
Jakob Borg
76ef42ee07 No drone.io badge 2014-04-27 13:37:53 +02:00
Jakob Borg
92c1ce57a6 Fix protocol close test 2014-04-27 13:25:35 +02:00
Jakob Borg
116f232f5a Streamline error handling and locking 2014-04-27 13:10:50 +02:00
Jakob Borg
ef81a36654 Extract method closeFile 2014-04-27 12:14:53 +02:00
Jakob Borg
9fd2724d73 Simplify requestSlots filling 2014-04-27 12:06:11 +02:00
44 changed files with 1290 additions and 665 deletions

View File

@@ -1,6 +1,6 @@
Please do contribute! If you want to contribute but are unsure where to
start, the [Contributions Needed
page](https://github.com/calmh/syncthing/wiki/Contributions-Needed)
topic](http://discourse.syncthing.net/t/contributions-needed/49)
lists areas in need of attention.
## Licensing
@@ -15,7 +15,8 @@ will ensure that you are added to the CONTRIBUTORS file.
## Building
[See the wiki](https://github.com/calmh/syncthing/wiki/Building)
[See the
documentation](http://discourse.syncthing.net/t/building-syncthing/44)
## Branches
@@ -46,7 +47,7 @@ Yes please!
## Documentation
[Hack it here](https://github.com/calmh/syncthing/wiki)
[Over here!](http://discourse.syncthing.net/category/documentation)
## License

View File

@@ -1,4 +1,5 @@
Aaron Bieber <qbit@deftly.net>
Andrew Dunham <andrew@du.nham.ca>
Brandon Philips <brandon@ifup.org>
James Patterson <jamespatterson@operamail.com>
Philippe Schommers <philippe@schommers.be>

5
Godeps/Godeps.json generated
View File

@@ -8,6 +8,11 @@
"./discover/cmd/discosrv"
],
"Deps": [
{
"ImportPath": "bitbucket.org/kardianos/osext",
"Comment": "null-9",
"Rev": "364fb577de68fb646c4cb39cc0e09c887ee16376"
},
{
"ImportPath": "code.google.com/p/go.crypto/bcrypt",
"Comment": "null-185",

View File

@@ -0,0 +1,20 @@
Copyright (c) 2012 Daniel Theophanes
This software is provided 'as-is', without any express or implied
warranty. In no event will the authors be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not
claim that you wrote the original software. If you use this software
in a product, an acknowledgment in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original software.
3. This notice may not be removed or altered from any source
distribution.

View File

@@ -0,0 +1,32 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Extensions to the standard "os" package.
package osext
import "path/filepath"
// Executable returns an absolute path that can be used to
// re-invoke the current program.
// It may not be valid after the current program exits.
func Executable() (string, error) {
p, err := executable()
return filepath.Clean(p), err
}
// Returns same path as Executable, returns just the folder
// path. Excludes the executable name.
func ExecutableFolder() (string, error) {
p, err := Executable()
if err != nil {
return "", err
}
folder, _ := filepath.Split(p)
return folder, nil
}
// Depricated. Same as Executable().
func GetExePath() (exePath string, err error) {
return Executable()
}

View File

@@ -0,0 +1,16 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package osext
import "syscall"
func executable() (string, error) {
f, err := Open("/proc/" + itoa(Getpid()) + "/text")
if err != nil {
return "", err
}
defer f.Close()
return syscall.Fd2path(int(f.Fd()))
}

View File

@@ -0,0 +1,25 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build linux netbsd openbsd
package osext
import (
"errors"
"os"
"runtime"
)
func executable() (string, error) {
switch runtime.GOOS {
case "linux":
return os.Readlink("/proc/self/exe")
case "netbsd":
return os.Readlink("/proc/curproc/exe")
case "openbsd":
return os.Readlink("/proc/curproc/file")
}
return "", errors.New("ExecPath not implemented for " + runtime.GOOS)
}

View File

@@ -0,0 +1,82 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build darwin freebsd
package osext
import (
"os"
"path/filepath"
"runtime"
"syscall"
"unsafe"
)
var startUpcwd, getwdError = os.Getwd()
func executable() (string, error) {
var mib [4]int32
switch runtime.GOOS {
case "freebsd":
mib = [4]int32{1 /* CTL_KERN */, 14 /* KERN_PROC */, 12 /* KERN_PROC_PATHNAME */, -1}
case "darwin":
mib = [4]int32{1 /* CTL_KERN */, 38 /* KERN_PROCARGS */, int32(os.Getpid()), -1}
}
n := uintptr(0)
// get length
_, _, err := syscall.Syscall6(syscall.SYS___SYSCTL, uintptr(unsafe.Pointer(&mib[0])), 4, 0, uintptr(unsafe.Pointer(&n)), 0, 0)
if err != 0 {
return "", err
}
if n == 0 { // shouldn't happen
return "", nil
}
buf := make([]byte, n)
_, _, err = syscall.Syscall6(syscall.SYS___SYSCTL, uintptr(unsafe.Pointer(&mib[0])), 4, uintptr(unsafe.Pointer(&buf[0])), uintptr(unsafe.Pointer(&n)), 0, 0)
if err != 0 {
return "", err
}
if n == 0 { // shouldn't happen
return "", nil
}
for i, v := range buf {
if v == 0 {
buf = buf[:i]
break
}
}
var strpath string
if buf[0] != '/' {
var e error
if strpath, e = getAbs(buf); e != nil {
return strpath, e
}
} else {
strpath = string(buf)
}
// darwin KERN_PROCARGS may return the path to a symlink rather than the
// actual executable
if runtime.GOOS == "darwin" {
if strpath, err := filepath.EvalSymlinks(strpath); err != nil {
return strpath, err
}
}
return strpath, nil
}
func getAbs(buf []byte) (string, error) {
if getwdError != nil {
return string(buf), getwdError
} else {
if buf[0] == '.' {
buf = buf[1:]
}
if startUpcwd[len(startUpcwd)-1] != '/' && buf[0] != '/' {
return startUpcwd + "/" + string(buf), nil
}
return startUpcwd + string(buf), nil
}
}

View File

@@ -0,0 +1,79 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build darwin linux freebsd netbsd windows
package osext
import (
"fmt"
"os"
oexec "os/exec"
"path/filepath"
"runtime"
"testing"
)
const execPath_EnvVar = "OSTEST_OUTPUT_EXECPATH"
func TestExecPath(t *testing.T) {
ep, err := Executable()
if err != nil {
t.Fatalf("ExecPath failed: %v", err)
}
// we want fn to be of the form "dir/prog"
dir := filepath.Dir(filepath.Dir(ep))
fn, err := filepath.Rel(dir, ep)
if err != nil {
t.Fatalf("filepath.Rel: %v", err)
}
cmd := &oexec.Cmd{}
// make child start with a relative program path
cmd.Dir = dir
cmd.Path = fn
// forge argv[0] for child, so that we can verify we could correctly
// get real path of the executable without influenced by argv[0].
cmd.Args = []string{"-", "-test.run=XXXX"}
cmd.Env = []string{fmt.Sprintf("%s=1", execPath_EnvVar)}
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("exec(self) failed: %v", err)
}
outs := string(out)
if !filepath.IsAbs(outs) {
t.Fatalf("Child returned %q, want an absolute path", out)
}
if !sameFile(outs, ep) {
t.Fatalf("Child returned %q, not the same file as %q", out, ep)
}
}
func sameFile(fn1, fn2 string) bool {
fi1, err := os.Stat(fn1)
if err != nil {
return false
}
fi2, err := os.Stat(fn2)
if err != nil {
return false
}
return os.SameFile(fi1, fi2)
}
func init() {
if e := os.Getenv(execPath_EnvVar); e != "" {
// first chdir to another path
dir := "/"
if runtime.GOOS == "windows" {
dir = filepath.VolumeName(".")
}
os.Chdir(dir)
if ep, err := Executable(); err != nil {
fmt.Fprint(os.Stderr, "ERROR: ", err)
} else {
fmt.Fprint(os.Stderr, ep)
}
os.Exit(0)
}
}

View File

@@ -0,0 +1,34 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package osext
import (
"syscall"
"unicode/utf16"
"unsafe"
)
var (
kernel = syscall.MustLoadDLL("kernel32.dll")
getModuleFileNameProc = kernel.MustFindProc("GetModuleFileNameW")
)
// GetModuleFileName() with hModule = NULL
func executable() (exePath string, err error) {
return getModuleFileName()
}
func getModuleFileName() (string, error) {
var n uint32
b := make([]uint16, syscall.MAX_PATH)
size := uint32(len(b))
r0, _, e1 := getModuleFileNameProc.Call(0, uintptr(unsafe.Pointer(&b[0])), uintptr(size))
n = uint32(r0)
if n == 0 {
return "", e1
}
return string(utf16.Decode(b[0:n])), nil
}

View File

@@ -1,4 +1,4 @@
syncthing [![Build Status](https://drone.io/github.com/calmh/syncthing/status.png)](https://drone.io/github.com/calmh/syncthing/latest)
syncthing
=========
This is the `syncthing` project. The following are the project goals:
@@ -25,6 +25,11 @@ making sure large swarms of selfish agents behave and somehow work
towards a common goal. Here we have a much smaller swarm of cooperative
agents and a simpler approach will suffice.
Getting Started
---------------
Take a look at the [getting started guide](http://discourse.syncthing.net/t/getting-started/46).
Signed Releases
---------------
@@ -35,8 +40,9 @@ normal release bundle as `syncthing.asc` or `syncthing.exe.asc`.
Documentation
=============
The syncthing documentation is kept on the
[GitHub Wiki](https://github.com/calmh/syncthing/wiki).
The [syncthing
documentation](http://discourse.syncthing.net/category/documentation) is
on the discourse site.
License
=======

View File

File diff suppressed because one or more lines are too long

View File

@@ -11,6 +11,8 @@ host=${host%%.*}
ldflags="-w -X main.Version $version -X main.BuildStamp $date -X main.BuildUser $user -X main.BuildHost $host"
build() {
go vet ./... || exit 1
if command -v godep >/dev/null ; then
godep=godep
else
@@ -20,7 +22,6 @@ build() {
godep=
fi
${godep} go build $* -ldflags "$ldflags" ./cmd/syncthing
${godep} go build -ldflags "$ldflags" ./cmd/stcli
}
assets() {
@@ -62,7 +63,7 @@ zipDist() {
}
deps() {
godep save ./cmd/syncthing ./cmd/assets ./cmd/stcli ./discover/cmd/discosrv
godep save ./cmd/syncthing ./cmd/assets ./discover/cmd/discosrv
}
case "$1" in
@@ -96,7 +97,7 @@ case "$1" in
test || exit 1
assets
for os in darwin-amd64 linux-386 linux-amd64 freebsd-amd64 windows-amd64 ; do
for os in darwin-amd64 linux-386 linux-amd64 freebsd-amd64 windows-amd64 windows-386 ; do
export GOOS=${os%-*}
export GOARCH=${os#*-}
@@ -126,6 +127,10 @@ case "$1" in
build
tarDist "syncthing-linux-armv6-$version"
export GOARM=5
build
tarDist "syncthing-linux-armv5-$version"
;;
upload)

View File

@@ -1,72 +0,0 @@
package main
import (
"fmt"
"log"
"os"
)
var logger *log.Logger
func init() {
log.SetOutput(os.Stderr)
logger = log.New(os.Stderr, "", log.Flags())
}
func debugln(vals ...interface{}) {
s := fmt.Sprintln(vals...)
logger.Output(2, "DEBUG: "+s)
}
func debugf(format string, vals ...interface{}) {
s := fmt.Sprintf(format, vals...)
logger.Output(2, "DEBUG: "+s)
}
func infoln(vals ...interface{}) {
s := fmt.Sprintln(vals...)
logger.Output(2, "INFO: "+s)
}
func infof(format string, vals ...interface{}) {
s := fmt.Sprintf(format, vals...)
logger.Output(2, "INFO: "+s)
}
func okln(vals ...interface{}) {
s := fmt.Sprintln(vals...)
logger.Output(2, "OK: "+s)
}
func okf(format string, vals ...interface{}) {
s := fmt.Sprintf(format, vals...)
logger.Output(2, "OK: "+s)
}
func warnln(vals ...interface{}) {
s := fmt.Sprintln(vals...)
logger.Output(2, "WARNING: "+s)
}
func warnf(format string, vals ...interface{}) {
s := fmt.Sprintf(format, vals...)
logger.Output(2, "WARNING: "+s)
}
func fatalln(vals ...interface{}) {
s := fmt.Sprintln(vals...)
logger.Output(2, "FATAL: "+s)
os.Exit(3)
}
func fatalf(format string, vals ...interface{}) {
s := fmt.Sprintf(format, vals...)
logger.Output(2, "FATAL: "+s)
os.Exit(3)
}
func fatalErr(err error) {
if err != nil {
fatalf(err.Error())
}
}

View File

@@ -1,24 +1,11 @@
package main
import (
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/base32"
"encoding/pem"
"math/big"
"os"
"path/filepath"
"strings"
"time"
)
const (
tlsRSABits = 3072
tlsName = "syncthing"
)
func loadCert(dir string) (tls.Certificate, error) {
@@ -31,41 +18,3 @@ func certID(bs []byte) string {
id := hf.Sum(nil)
return strings.Trim(base32.StdEncoding.EncodeToString(id), "=")
}
func newCertificate(dir string) {
infoln("Generating RSA certificate and key...")
priv, err := rsa.GenerateKey(rand.Reader, tlsRSABits)
fatalErr(err)
notBefore := time.Now()
notAfter := time.Date(2049, 12, 31, 23, 59, 59, 0, time.UTC)
template := x509.Certificate{
SerialNumber: new(big.Int).SetInt64(0),
Subject: pkix.Name{
CommonName: tlsName,
},
NotBefore: notBefore,
NotAfter: notAfter,
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
BasicConstraintsValid: true,
}
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv)
fatalErr(err)
certOut, err := os.Create(filepath.Join(dir, "cert.pem"))
fatalErr(err)
pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
certOut.Close()
okln("Created RSA certificate file")
keyOut, err := os.OpenFile(filepath.Join(dir, "key.pem"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
fatalErr(err)
pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(priv)})
keyOut.Close()
okln("Created RSA key file")
}

View File

@@ -1,6 +1,10 @@
package main
import "github.com/calmh/syncthing/scanner"
import (
"sync/atomic"
"github.com/calmh/syncthing/scanner"
)
type bqAdd struct {
file scanner.File
@@ -20,6 +24,7 @@ type blockQueue struct {
outbox chan bqBlock
queued []bqBlock
qlen uint32
}
func newBlockQueue() *blockQueue {
@@ -77,6 +82,7 @@ func (q *blockQueue) run() {
q.queued = q.queued[1:]
}
}
atomic.StoreUint32(&q.qlen, uint32(len(q.queued)))
}
}
@@ -89,6 +95,7 @@ func (q *blockQueue) get() bqBlock {
}
func (q *blockQueue) empty() bool {
// There is a race condition here. We're only mostly sure the queue is empty if the expression below is true.
return len(q.queued) == 0 && len(q.inbox) == 0 && len(q.outbox) == 0
var l uint32
atomic.LoadUint32(&l)
return l == 0
}

View File

@@ -25,6 +25,7 @@ type RepositoryConfiguration struct {
Directory string `xml:"directory,attr"`
Nodes []NodeConfiguration `xml:"node"`
ReadOnly bool `xml:"ro,attr"`
Invalid string `xml:"-"` // Set at runtime when there is an error, not saved
nodeIDs []string
}
@@ -171,17 +172,21 @@ func readConfigXML(rd io.Reader, myID string) (Configuration, error) {
cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress)
// Check for missing or duplicate repository ID:s
var seenRepos = map[string]bool{}
var seenRepos = map[string]*RepositoryConfiguration{}
for i := range cfg.Repositories {
if cfg.Repositories[i].ID == "" {
cfg.Repositories[i].ID = "default"
repo := &cfg.Repositories[i]
if repo.ID == "" {
repo.ID = "default"
}
id := cfg.Repositories[i].ID
if seenRepos[id] {
panic("duplicate repository ID " + id)
if seen, ok := seenRepos[repo.ID]; ok {
seen.Invalid = "duplicate repository ID"
repo.Invalid = "duplicate repository ID"
warnf("Multiple repositories with ID %q; disabling", repo.ID)
} else {
seenRepos[repo.ID] = repo
}
seenRepos[id] = true
}
// Upgrade to v2 configuration if appropriate
@@ -280,3 +285,13 @@ func ensureNodePresent(nodes []NodeConfiguration, myID string) []NodeConfigurati
return nodes
}
func invalidateRepo(repoID string, err error) {
for i := range cfg.Repositories {
repo := &cfg.Repositories[i]
if repo.ID == repoID {
repo.Invalid = err.Error()
return
}
}
}

View File

@@ -5,14 +5,15 @@ import (
"encoding/base64"
"encoding/json"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"runtime"
"sync"
"time"
"code.google.com/p/go.crypto/bcrypt"
"github.com/calmh/syncthing/scanner"
"github.com/codegangsta/martini"
)
@@ -25,13 +26,20 @@ var (
configInSync = true
guiErrors = []guiError{}
guiErrorsMut sync.Mutex
static = embeddedStatic()
staticFunc = static.(func(http.ResponseWriter, *http.Request, *log.Logger))
)
const (
unchangedPassword = "--password-unchanged--"
)
func startGUI(cfg GUIConfiguration, m *Model) {
func startGUI(cfg GUIConfiguration, m *Model) error {
l, err := net.Listen("tcp", cfg.Address)
if err != nil {
return err
}
router := martini.NewRouter()
router.Get("/", getRoot)
router.Get("/rest/version", restGetVersion)
@@ -48,25 +56,24 @@ func startGUI(cfg GUIConfiguration, m *Model) {
router.Post("/rest/error", restPostError)
router.Post("/rest/error/clear", restClearErrors)
go func() {
mr := martini.New()
if len(cfg.User) > 0 && len(cfg.Password) > 0 {
mr.Use(basic(cfg.User, cfg.Password))
}
mr.Use(embeddedStatic())
mr.Use(martini.Recovery())
mr.Use(restMiddleware)
mr.Action(router.Handle)
mr.Map(m)
err := http.ListenAndServe(cfg.Address, mr)
if err != nil {
warnln("GUI not possible:", err)
}
}()
mr := martini.New()
if len(cfg.User) > 0 && len(cfg.Password) > 0 {
mr.Use(basic(cfg.User, cfg.Password))
}
mr.Use(static)
mr.Use(martini.Recovery())
mr.Use(restMiddleware)
mr.Action(router.Handle)
mr.Map(m)
go http.Serve(l, mr)
return nil
}
func getRoot(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/index.html", 302)
r.URL.Path = "/index.html"
staticFunc(w, r, nil)
}
func restMiddleware(w http.ResponseWriter, r *http.Request) {
@@ -84,6 +91,13 @@ func restGetModel(m *Model, w http.ResponseWriter, r *http.Request) {
var repo = qs.Get("repo")
var res = make(map[string]interface{})
for _, cr := range cfg.Repositories {
if cr.ID == repo {
res["invalid"] = cr.Invalid
break
}
}
globalFiles, globalDeleted, globalBytes := m.GlobalSize(repo)
res["globalFiles"], res["globalDeleted"], res["globalBytes"] = globalFiles, globalDeleted, globalBytes
@@ -151,23 +165,6 @@ func restPostReset(req *http.Request) {
go restart()
}
type guiFile scanner.File
func (f guiFile) MarshalJSON() ([]byte, error) {
type t struct {
Name string
Size int64
Modified int64
Flags uint32
}
return json.Marshal(t{
Name: f.Name,
Size: scanner.File(f).Size,
Modified: f.Modified,
Flags: f.Flags,
})
}
var cpuUsagePercent [10]float64 // The last ten seconds
var cpuUsageLock sync.RWMutex
@@ -180,7 +177,7 @@ func restGetSystem(w http.ResponseWriter) {
res["goroutines"] = runtime.NumGoroutine()
res["alloc"] = m.Alloc
res["sys"] = m.Sys
if discoverer != nil {
if cfg.Options.GlobalAnnEnabled && discoverer != nil {
res["extAnnounceOK"] = discoverer.ExtAnnounceOK()
}
cpuUsageLock.RLock()

View File

@@ -32,7 +32,7 @@ func embeddedStatic() interface{} {
if len(mtype) != 0 {
res.Header().Set("Content-Type", mtype)
}
res.Header().Set("Content-Size", fmt.Sprintf("%d", len(bs)))
res.Header().Set("Content-Length", fmt.Sprintf("%d", len(bs)))
res.Header().Set("Last-Modified", modt)
res.Write(bs)

View File

@@ -83,9 +83,11 @@ const (
func main() {
var reset bool
var showVersion bool
var doUpgrade bool
flag.StringVar(&confDir, "home", getDefaultConfDir(), "Set configuration directory")
flag.BoolVar(&reset, "reset", false, "Prepare to resync from cluster")
flag.BoolVar(&showVersion, "version", false, "Show version")
flag.BoolVar(&doUpgrade, "upgrade", false, "Perform upgrade")
flag.Usage = usageFor(flag.CommandLine, usage, extraUsage)
flag.Parse()
@@ -99,6 +101,14 @@ func main() {
return
}
if doUpgrade {
err := upgrade()
if err != nil {
fatalln(err)
}
return
}
if len(os.Getenv("GOGC")) == 0 {
debug.SetGCPercent(25)
}
@@ -180,6 +190,14 @@ func main() {
},
}
port, err := getFreePort("127.0.0.1", 8080)
fatalErr(err)
cfg.GUI.Address = fmt.Sprintf("127.0.0.1:%d", port)
port, err = getFreePort("", 22000)
fatalErr(err)
cfg.Options.ListenAddress = []string{fmt.Sprintf(":%d", port)}
saveConfig()
infof("Edit %s to taste or use the GUI\n", cfgFile)
}
@@ -222,6 +240,9 @@ func main() {
m := NewModel(cfg.Options.MaxChangeKbps * 1000)
for _, repo := range cfg.Repositories {
if repo.Invalid != "" {
continue
}
dir := expandTilde(repo.Directory)
m.AddRepo(repo.ID, dir, repo.Nodes)
}
@@ -230,7 +251,7 @@ func main() {
if cfg.GUI.Enabled && cfg.GUI.Address != "" {
addr, err := net.ResolveTCPAddr("tcp", cfg.GUI.Address)
if err != nil {
warnf("Cannot start GUI on %q: %v", cfg.GUI.Address, err)
fatalf("Cannot start GUI on %q: %v", cfg.GUI.Address, err)
} else {
var hostOpen, hostShow string
switch {
@@ -246,7 +267,10 @@ func main() {
}
infof("Starting web GUI on http://%s:%d/", hostShow, addr.Port)
startGUI(cfg.GUI, m)
err := startGUI(cfg.GUI, m)
if err != nil {
fatalln("Cannot start GUI:", err)
}
if cfg.Options.StartBrowser && len(os.Getenv("STRESTART")) == 0 {
openURL(fmt.Sprintf("http://%s:%d", hostOpen, addr.Port))
}
@@ -260,6 +284,10 @@ func main() {
m.LoadIndexes(confDir)
for _, repo := range cfg.Repositories {
if repo.Invalid != "" {
continue
}
dir := expandTilde(repo.Directory)
// Safety check. If the cached index contains files but the repository
@@ -295,6 +323,10 @@ func main() {
go listenConnect(myID, m, tlsCfg)
for _, repo := range cfg.Repositories {
if repo.Invalid != "" {
continue
}
// Routine to pull blocks from other nodes to synchronize the local
// repository. Does not run when we are in read only (publish only) mode.
if repo.ReadOnly {
@@ -638,3 +670,35 @@ func getHomeDir() string {
return home
}
// getFreePort returns a free TCP port fort listening on. The ports given are
// tried in succession and the first to succeed is returned. If none succeed,
// a random high port is returned.
func getFreePort(host string, ports ...int) (int, error) {
for _, port := range ports {
c, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err == nil {
c.Close()
return port, nil
}
}
c, err := net.Listen("tcp", host+":0")
if err != nil {
return 0, err
}
addr := c.Addr().String()
c.Close()
_, portstr, err := net.SplitHostPort(addr)
if err != nil {
return 0, err
}
port, err := strconv.Atoi(portstr)
if err != nil {
return 0, err
}
return port, nil
}

View File

@@ -80,8 +80,8 @@ func NewModel(maxChangeBw int) *Model {
// read/write mode the model will attempt to keep in sync with the cluster by
// pulling needed files from peer nodes.
func (m *Model) StartRepoRW(repo string, threads int) {
m.rmut.Lock()
defer m.rmut.Unlock()
m.rmut.RLock()
defer m.rmut.RUnlock()
if dir, ok := m.repoDirs[repo]; !ok {
panic("cannot start without repo")
@@ -323,13 +323,15 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by
}
lf := r.Get(cid.LocalID, name)
if offset > lf.Size {
warnf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size)
return nil, ErrNoSuchFile
if lf.Suppressed || lf.Flags&protocol.FlagDeleted != 0 {
return nil, ErrInvalid
}
if lf.Suppressed {
return nil, ErrInvalid
if offset > lf.Size {
if debugNet {
dlog.Printf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
}
return nil, ErrNoSuchFile
}
if debugNet && nodeID != "<local>" {
@@ -423,14 +425,16 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
cm := m.clusterConfig(nodeID)
protoConn.ClusterConfig(cm)
var idxToSend = make(map[string][]protocol.FileInfo)
m.rmut.RLock()
for _, repo := range m.nodeRepos[nodeID] {
idxToSend[repo] = m.protocolIndex(repo)
}
m.rmut.RUnlock()
go func() {
m.rmut.RLock()
repos := m.nodeRepos[nodeID]
m.rmut.RUnlock()
for _, repo := range repos {
m.rmut.RLock()
idx := m.protocolIndex(repo)
m.rmut.RUnlock()
for repo, idx := range idxToSend {
if debugNet {
dlog.Printf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
}
@@ -557,9 +561,9 @@ func (m *Model) ScanRepos() {
}
}
func (m *Model) ScanRepo(repo string) {
func (m *Model) ScanRepo(repo string) error {
sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)}
m.rmut.Lock()
m.rmut.RLock()
w := &scanner.Walker{
Dir: m.repoDirs[repo],
IgnoreFile: ".stignore",
@@ -568,11 +572,15 @@ func (m *Model) ScanRepo(repo string) {
Suppressor: sup,
CurrentFiler: cFiler{m, repo},
}
m.rmut.Unlock()
m.rmut.RUnlock()
m.setState(repo, RepoScanning)
fs, _ := w.Walk()
fs, _, err := w.Walk()
if err != nil {
return err
}
m.ReplaceLocal(repo, fs)
m.setState(repo, RepoIdle)
return nil
}
func (m *Model) SaveIndexes(dir string) {
@@ -648,7 +656,7 @@ func (m *Model) clusterConfig(node string) protocol.ClusterConfigMessage {
ClientVersion: Version,
}
m.rmut.Lock()
m.rmut.RLock()
for _, repo := range m.nodeRepos[node] {
cr := protocol.Repository{
ID: repo,
@@ -662,7 +670,7 @@ func (m *Model) clusterConfig(node string) protocol.ClusterConfigMessage {
}
cm.Repositories = append(cm.Repositories, cr)
}
m.rmut.Unlock()
m.rmut.RUnlock()
return cm
}
@@ -674,9 +682,9 @@ func (m *Model) setState(repo string, state repoState) {
}
func (m *Model) State(repo string) string {
m.rmut.Lock()
m.rmut.RLock()
state := m.repoState[repo]
m.rmut.Unlock()
m.rmut.RUnlock()
switch state {
case RepoIdle:
return "idle"

View File

@@ -1,34 +0,0 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"os/exec"
"runtime"
)
func openURL(url string) error {
if runtime.GOOS == "windows" {
return exec.Command("cmd.exe", "/C", "start "+url).Run()
}
if runtime.GOOS == "darwin" {
return exec.Command("open", url).Run()
}
return exec.Command("xdg-open", url).Run()
}

View File

@@ -0,0 +1,23 @@
// +build !windows
package main
import (
"os/exec"
"runtime"
"syscall"
)
func openURL(url string) error {
switch runtime.GOOS {
case "darwin":
return exec.Command("open", url).Run()
default:
cmd := exec.Command("xdg-open", url)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
return cmd.Run()
}
}

View File

@@ -0,0 +1,9 @@
// +build windows
package main
import "os/exec"
func openURL(url string) error {
return exec.Command("cmd.exe", "/C", "start "+url).Run()
}

View File

@@ -135,7 +135,10 @@ func (p *puller) run() {
case b := <-p.blocks:
p.model.setState(p.repo, RepoSyncing)
changed = true
p.handleBlock(b)
if p.handleBlock(b) {
// Block was fully handled, free up the slot
p.requestSlots <- true
}
case <-timeout:
if len(p.openFiles) == 0 && p.bq.empty() {
@@ -170,7 +173,11 @@ func (p *puller) run() {
if debugPull {
dlog.Printf("%q: time for rescan", p.repo)
}
p.model.ScanRepo(p.repo)
err := p.model.ScanRepo(p.repo)
if err != nil {
invalidateRepo(p.repo, err)
return
}
default:
}
@@ -187,13 +194,17 @@ func (p *puller) runRO() {
if debugPull {
dlog.Printf("%q: time for rescan", p.repo)
}
p.model.ScanRepo(p.repo)
err := p.model.ScanRepo(p.repo)
if err != nil {
invalidateRepo(p.repo, err)
return
}
}
}
func (p *puller) fixupDirectories() {
var deleteDirs []string
fn := func(path string, info os.FileInfo, err error) error {
filepath.Walk(p.dir, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
return nil
}
@@ -242,8 +253,7 @@ func (p *puller) fixupDirectories() {
}
return nil
}
filepath.Walk(p.dir, fn)
})
// Delete any queued directories
for i := len(deleteDirs) - 1; i >= 0; i-- {
@@ -278,54 +288,14 @@ func (p *puller) handleRequestResult(res requestResult) {
}
if of.done && of.outstanding == 0 {
if debugPull {
dlog.Printf("pull: closing %q / %q", p.repo, f.Name)
}
of.file.Close()
defer os.Remove(of.temp)
delete(p.openFiles, f.Name)
fd, err := os.Open(of.temp)
if err != nil {
if debugPull {
dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, err)
}
return
}
hb, _ := scanner.Blocks(fd, BlockSize)
fd.Close()
if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
if debugPull {
dlog.Printf("pull: %q / %q: nblocks %d != %d", p.repo, f.Name, l0, l1)
}
return
}
for i := range hb {
if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
dlog.Printf("pull: %q / %q: block %d hash mismatch", p.repo, f.Name, i)
return
}
}
t := time.Unix(f.Modified, 0)
os.Chtimes(of.temp, t, t)
os.Chmod(of.temp, os.FileMode(f.Flags&0777))
defTempNamer.Show(of.temp)
if debugPull {
dlog.Printf("pull: rename %q / %q: %q", p.repo, f.Name, of.filepath)
}
if err := Rename(of.temp, of.filepath); err == nil {
p.model.updateLocal(p.repo, f)
} else {
dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, err)
}
p.closeFile(f)
}
}
func (p *puller) handleBlock(b bqBlock) {
// handleBlock fulfills the block request by copying, ignoring or fetching
// from the network. Returns true if the block was fully handled
// synchronously, i.e. if the slot can be reused.
func (p *puller) handleBlock(b bqBlock) bool {
f := b.file
// For directories, simply making sure they exist is enough
@@ -336,8 +306,7 @@ func (p *puller) handleBlock(b bqBlock) {
os.MkdirAll(path, 0777)
}
p.model.updateLocal(p.repo, f)
p.requestSlots <- true
return
return true
}
of, ok := p.openFiles[f.Name]
@@ -369,8 +338,7 @@ func (p *puller) handleBlock(b bqBlock) {
if !b.last {
p.openFiles[f.Name] = of
}
p.requestSlots <- true
return
return true
}
defTempNamer.Hide(of.temp)
}
@@ -385,8 +353,7 @@ func (p *puller) handleBlock(b bqBlock) {
delete(p.openFiles, f.Name)
}
p.requestSlots <- true
return
return true
}
p.openFiles[f.Name] = of
@@ -394,15 +361,14 @@ func (p *puller) handleBlock(b bqBlock) {
switch {
case len(b.copy) > 0:
p.handleCopyBlock(b)
p.requestSlots <- true
return true
case b.block.Size > 0:
p.handleRequestBlock(b)
// Request slot gets freed in <-p.blocks case
return p.handleRequestBlock(b)
default:
p.handleEmptyBlock(b)
p.requestSlots <- true
return true
}
}
@@ -450,11 +416,15 @@ func (p *puller) handleCopyBlock(b bqBlock) {
}
}
func (p *puller) handleRequestBlock(b bqBlock) {
// We have a block to get from the network
// handleRequestBlock tries to pull a block from the network. Returns true if
// the block could _not_ be fetched (i.e. it was fully handled, matching the
// return criteria of handleBlock)
func (p *puller) handleRequestBlock(b bqBlock) bool {
f := b.file
of := p.openFiles[f.Name]
of, ok := p.openFiles[f.Name]
if !ok {
panic("bug: request for non-open file")
}
node := p.oustandingPerNode.leastBusyNode(of.availability, p.model.cm)
if len(node) == 0 {
@@ -469,8 +439,7 @@ func (p *puller) handleRequestBlock(b bqBlock) {
} else {
p.openFiles[f.Name] = of
}
p.requestSlots <- true
return
return true
}
of.outstanding++
@@ -491,6 +460,8 @@ func (p *puller) handleRequestBlock(b bqBlock) {
err: err,
}
}(node, b)
return false
}
func (p *puller) handleEmptyBlock(b bqBlock) {
@@ -542,3 +513,52 @@ func (p *puller) queueNeededBlocks() {
dlog.Printf("%q: queued %d blocks", p.repo, queued)
}
}
func (p *puller) closeFile(f scanner.File) {
if debugPull {
dlog.Printf("pull: closing %q / %q", p.repo, f.Name)
}
of := p.openFiles[f.Name]
of.file.Close()
defer os.Remove(of.temp)
delete(p.openFiles, f.Name)
fd, err := os.Open(of.temp)
if err != nil {
if debugPull {
dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, err)
}
return
}
hb, _ := scanner.Blocks(fd, BlockSize)
fd.Close()
if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
if debugPull {
dlog.Printf("pull: %q / %q: nblocks %d != %d", p.repo, f.Name, l0, l1)
}
return
}
for i := range hb {
if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
dlog.Printf("pull: %q / %q: block %d hash mismatch", p.repo, f.Name, i)
return
}
}
t := time.Unix(f.Modified, 0)
os.Chtimes(of.temp, t, t)
os.Chmod(of.temp, os.FileMode(f.Flags&0777))
defTempNamer.Show(of.temp)
if debugPull {
dlog.Printf("pull: rename %q / %q: %q", p.repo, f.Name, of.filepath)
}
if err := Rename(of.temp, of.filepath); err == nil {
p.model.updateLocal(p.repo, f)
} else {
dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, err)
}
}

View File

@@ -0,0 +1,146 @@
// +build !windows
package main
import (
"archive/tar"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"bitbucket.org/kardianos/osext"
)
type githubRelease struct {
Tag string `json:"tag_name"`
Prelease bool `json:"prerelease"`
Assets []githubAsset `json:"assets"`
}
type githubAsset struct {
URL string `json:"url"`
Name string `json:"name"`
}
func upgrade() error {
path, err := osext.Executable()
if err != nil {
return err
}
resp, err := http.Get("https://api.github.com/repos/calmh/syncthing/releases?per_page=1")
if err != nil {
return err
}
var rels []githubRelease
json.NewDecoder(resp.Body).Decode(&rels)
resp.Body.Close()
if len(rels) != 1 {
return fmt.Errorf("Unexpected number of releases: %d", len(rels))
}
rel := rels[0]
if rel.Tag > Version {
infof("Attempting upgrade to %s...", rel.Tag)
} else if rel.Tag == Version {
okf("Already running the latest version, %s. Not upgrading.", Version)
return nil
} else {
okf("Current version %s is newer than latest release %s. Not upgrading.", Version, rel.Tag)
return nil
}
expectedRelease := fmt.Sprintf("syncthing-%s-%s-%s.", runtime.GOOS, runtime.GOARCH, rel.Tag)
for _, asset := range rel.Assets {
if strings.HasPrefix(asset.Name, expectedRelease) {
if strings.HasSuffix(asset.Name, ".tar.gz") {
infof("Downloading %s...", asset.Name)
fname, err := readTarGZ(asset.URL, filepath.Dir(path))
if err != nil {
return err
}
old := path + "." + Version
err = os.Rename(path, old)
if err != nil {
return err
}
err = os.Rename(fname, path)
if err != nil {
return err
}
okf("Upgraded %q to %s.", path, rel.Tag)
okf("Previous version saved in %q.", old)
return nil
}
}
}
return nil
}
func readTarGZ(url string, dir string) (string, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return "", err
}
req.Header.Add("Accept", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
gr, err := gzip.NewReader(resp.Body)
if err != nil {
return "", err
}
tr := tar.NewReader(gr)
if err != nil {
return "", err
}
// Iterate through the files in the archive.
for {
hdr, err := tr.Next()
if err == io.EOF {
// end of tar archive
break
}
if err != nil {
return "", err
}
if path.Base(hdr.Name) == "syncthing" {
of, err := ioutil.TempFile(dir, "syncthing")
if err != nil {
return "", err
}
io.Copy(of, tr)
err = of.Close()
if err != nil {
os.Remove(of.Name())
return "", err
}
os.Chmod(of.Name(), os.FileMode(hdr.Mode))
return of.Name(), nil
}
}
return "", fmt.Errorf("No upgrade found")
}

View File

@@ -0,0 +1,9 @@
// +build windows
package main
import "errors"
func upgrade() error {
return errors.New("Upgrade currently unsupported on Windows")
}

View File

@@ -10,32 +10,6 @@ import (
"github.com/calmh/syncthing/scanner"
)
func MetricPrefix(n int64) string {
if n > 1e9 {
return fmt.Sprintf("%.02f G", float64(n)/1e9)
}
if n > 1e6 {
return fmt.Sprintf("%.02f M", float64(n)/1e6)
}
if n > 1e3 {
return fmt.Sprintf("%.01f k", float64(n)/1e3)
}
return fmt.Sprintf("%d ", n)
}
func BinaryPrefix(n int64) string {
if n > 1<<30 {
return fmt.Sprintf("%.02f Gi", float64(n)/(1<<30))
}
if n > 1<<20 {
return fmt.Sprintf("%.02f Mi", float64(n)/(1<<20))
}
if n > 1<<10 {
return fmt.Sprintf("%.01f Ki", float64(n)/(1<<10))
}
return fmt.Sprintf("%d ", n)
}
func Rename(from, to string) error {
if runtime.GOOS == "windows" {
err := os.Remove(to)

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"log"
"net"
"strings"
"sync"
"time"
@@ -76,6 +75,20 @@ func (d *Discoverer) ExtAnnounceOK() bool {
return d.extAnnounceOK
}
func (d *Discoverer) Lookup(node string) []string {
d.registryLock.Lock()
addr, ok := d.registry[node]
d.registryLock.Unlock()
if ok {
return addr
} else if len(d.extServer) != 0 {
// We might want to cache this, but not permanently so it needs some intelligence
return d.externalLookup(node)
}
return nil
}
func (d *Discoverer) announcementPkt() []byte {
var addrs []Address
for _, astr := range d.listenAddrs {
@@ -203,7 +216,7 @@ func (d *Discoverer) recvAnnouncements() {
for _, a := range pkt.Addresses {
var nodeAddr string
if len(a.IP) > 0 {
nodeAddr = fmt.Sprintf("%s:%d", ipStr(a.IP), a.Port)
nodeAddr = fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
} else {
ua := addr.(*net.UDPAddr)
ua.Port = int(a.Port)
@@ -285,39 +298,8 @@ func (d *Discoverer) externalLookup(node string) []string {
var addrs []string
for _, a := range pkt.Addresses {
var nodeAddr string
if len(a.IP) > 0 {
nodeAddr = fmt.Sprintf("%s:%d", ipStr(a.IP), a.Port)
}
nodeAddr := fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
addrs = append(addrs, nodeAddr)
}
return addrs
}
func (d *Discoverer) Lookup(node string) []string {
d.registryLock.Lock()
addr, ok := d.registry[node]
d.registryLock.Unlock()
if ok {
return addr
} else if len(d.extServer) != 0 {
// We might want to cache this, but not permanently so it needs some intelligence
return d.externalLookup(node)
}
return nil
}
func ipStr(ip []byte) string {
var f = "%d"
var s = "."
if len(ip) > 4 {
f = "%x"
s = ":"
}
var ss = make([]string, len(ip))
for i := range ip {
ss[i] = fmt.Sprintf(f, ip[i])
}
return strings.Join(ss, s)
}

View File

@@ -108,8 +108,8 @@ func (m *Set) Need(id uint) []scanner.File {
if debug {
dlog.Printf("Need(%d)", id)
}
var fs = make([]scanner.File, 0, len(m.globalKey)/2) // Just a guess, but avoids too many reallocations
m.Lock()
var fs = make([]scanner.File, 0, len(m.globalKey)/2) // Just a guess, but avoids too many reallocations
rkID := m.remoteKey[id]
for gk, gf := range m.files {
if !gf.Global {
@@ -145,8 +145,8 @@ func (m *Set) Global() []scanner.File {
if debug {
dlog.Printf("Global()")
}
var fs = make([]scanner.File, 0, len(m.globalKey))
m.Lock()
var fs = make([]scanner.File, 0, len(m.globalKey))
for _, file := range m.files {
if file.Global {
fs = append(fs, file.File)

1
gui/angular.min.js vendored
View File

@@ -200,4 +200,3 @@ isolateScope:Ea.isolateScope,controller:Ea.controller,injector:Ea.injector,inher
$$csp:Tb});Ta=Uc(Z);try{Ta("ngLocale")}catch(c){Ta("ngLocale",[]).provider("$locale",rd)}Ta("ng",["ngLocale"],["$provide",function(a){a.provider({$$sanitizeUri:Bd});a.provider("$compile",ic).directive({a:Wd,input:Lc,textarea:Lc,form:Xd,script:De,select:Ge,style:Ie,option:He,ngBind:he,ngBindHtml:je,ngBindTemplate:ie,ngClass:ke,ngClassEven:me,ngClassOdd:le,ngCloak:ne,ngController:oe,ngForm:Yd,ngHide:xe,ngIf:pe,ngInclude:qe,ngInit:se,ngNonBindable:te,ngPluralize:ue,ngRepeat:ve,ngShow:we,ngStyle:ye,ngSwitch:ze,
ngSwitchWhen:Ae,ngSwitchDefault:Be,ngOptions:Fe,ngTransclude:Ce,ngModel:ce,ngList:ee,ngChange:de,required:Mc,ngRequired:Mc,ngValue:ge}).directive({ngInclude:re}).directive(Nb).directive(Nc);a.provider({$anchorScroll:cd,$animate:Td,$browser:ed,$cacheFactory:fd,$controller:id,$document:jd,$exceptionHandler:kd,$filter:Ac,$interpolate:pd,$interval:qd,$http:ld,$httpBackend:nd,$location:td,$log:ud,$parse:xd,$rootScope:Ad,$q:yd,$sce:Ed,$sceDelegate:Dd,$sniffer:Fd,$templateCache:gd,$timeout:Gd,$window:Hd})}])})(Na);
A(Q).ready(function(){Sc(Q,Yb)})})(window,document);!angular.$$csp()&&angular.element(document).find("head").prepend('<style type="text/css">@charset "UTF-8";[ng\\:cloak],[ng-cloak],[data-ng-cloak],[x-ng-cloak],.ng-cloak,.x-ng-cloak,.ng-hide{display:none !important;}ng\\:form{display:block;}</style>');
//# sourceMappingURL=angular.min.js.map

View File

@@ -4,6 +4,7 @@
'use strict';
var syncthing = angular.module('syncthing', []);
var urlbase = 'rest';
syncthing.controller('SyncthingCtrl', function ($scope, $http) {
var prevDate = 0;
@@ -43,10 +44,12 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
function getSucceeded() {
if (!getOK) {
$scope.init();
$('#networkError').modal('hide');
getOK = true;
}
if (restarting) {
$scope.init();
$('#restarting').modal('hide');
restarting = false;
}
@@ -74,19 +77,26 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
return a.NodeID > b.NodeID;
}
function repoCompare(a, b) {
if (a.Directory < b.Directory) {
return -1;
}
return a.Directory > b.Directory;
}
$scope.refresh = function () {
$http.get('/rest/system').success(function (data) {
$http.get(urlbase + '/system').success(function (data) {
getSucceeded();
$scope.system = data;
}).error(function () {
getFailed();
});
$scope.repos.forEach(function (repo) {
$http.get('/rest/model?repo=' + encodeURIComponent(repo.ID)).success(function (data) {
$http.get(urlbase + '/model?repo=' + encodeURIComponent(repo.ID)).success(function (data) {
$scope.model[repo.ID] = data;
});
});
$http.get('/rest/connections').success(function (data) {
$http.get(urlbase + '/connections').success(function (data) {
var now = Date.now(),
td = (now - prevDate) / 1000,
id;
@@ -111,7 +121,7 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
}
$scope.connections = data;
});
$http.get('/rest/errors').success(function (data) {
$http.get(urlbase + '/errors').success(function (data) {
$scope.errors = data;
});
};
@@ -121,6 +131,10 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
return 'Unknown';
}
if ($scope.model[repo].invalid !== '') {
return 'Stopped';
}
var state = '' + $scope.model[repo].state;
state = state[0].toUpperCase() + state.substr(1);
@@ -136,6 +150,10 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
return 'text-info';
}
if ($scope.model[repo].invalid !== '') {
return 'text-danger';
}
var state = '' + $scope.model[repo].state;
if (state == 'idle') {
return 'text-success';
@@ -238,14 +256,14 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
$scope.saveSettings = function () {
$scope.configInSync = false;
$scope.config.Options.ListenAddress = $scope.config.Options.ListenStr.split(',').map(function (x) { return x.trim(); });
$http.post('/rest/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
$http.post(urlbase + '/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
$('#settings').modal("hide");
};
$scope.restart = function () {
restarting = true;
$('#restarting').modal('show');
$http.post('/rest/restart');
$http.post(urlbase + '/restart');
$scope.configInSync = true;
};
@@ -282,7 +300,7 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
}
$scope.configInSync = false;
$http.post('/rest/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
$http.post(urlbase + '/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
};
$scope.saveNode = function () {
@@ -309,7 +327,7 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
$scope.nodes.sort(nodeCompare);
$scope.config.Nodes = $scope.nodes;
$http.post('/rest/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
$http.post(urlbase + '/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
};
$scope.otherNodes = function () {
@@ -337,7 +355,7 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
$scope.clearErrors = function () {
$scope.seenError = $scope.errors[$scope.errors.length - 1].Time;
$http.post('/rest/error/clear');
$http.post(urlbase + '/error/clear');
};
$scope.friendlyNodes = function (str) {
@@ -393,7 +411,7 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
$scope.config.Repositories = $scope.repos;
$http.post('/rest/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
$http.post(urlbase + '/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
};
$scope.deleteRepo = function () {
@@ -409,35 +427,38 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
$scope.config.Repositories = $scope.repos;
$scope.configInSync = false;
$http.post('/rest/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
$http.post(urlbase + '/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
};
$http.get('/rest/version').success(function (data) {
$scope.version = data;
});
$scope.init = function() {
$http.get(urlbase + '/version').success(function (data) {
$scope.version = data;
});
$http.get('/rest/system').success(function (data) {
$scope.system = data;
$scope.myID = data.myID;
});
$http.get(urlbase + '/system').success(function (data) {
$scope.system = data;
$scope.myID = data.myID;
});
$http.get('/rest/config').success(function (data) {
$scope.config = data;
$scope.config.Options.ListenStr = $scope.config.Options.ListenAddress.join(', ');
$http.get(urlbase + '/config').success(function (data) {
$scope.config = data;
$scope.config.Options.ListenStr = $scope.config.Options.ListenAddress.join(', ');
var nodes = $scope.config.Nodes;
nodes.sort(nodeCompare);
$scope.nodes = nodes;
$scope.nodes = $scope.config.Nodes;
$scope.nodes.sort(nodeCompare);
$scope.repos = $scope.config.Repositories;
$scope.repos = $scope.config.Repositories;
$scope.repos.sort(repoCompare);
$scope.refresh();
});
$scope.refresh();
});
$http.get('/rest/config/sync').success(function (data) {
$scope.configInSync = data.configInSync;
});
$http.get(urlbase + '/config/sync').success(function (data) {
$scope.configInSync = data.configInSync;
});
};
$scope.init();
setInterval($scope.refresh, 10000);
});

View File

@@ -134,6 +134,7 @@
<ul class="list-unstyled" ng-repeat="repo in repos">
<li>
<span class="text-monospace">{{repo.Directory}}</span>
<span ng-if="model[repo.ID].invalid" class="label label-danger">{{model[repo.ID].invalid}}</span>
<ul class="list-no-bullet">
<li>
<div class="li-column" title="Repository ID">
@@ -360,8 +361,9 @@
<form role="form">
<div class="form-group">
<label for="nodeID">Node ID</label>
<input ng-disabled="editingExisting" id="nodeID" class="form-control" type="text" ng-model="currentNode.NodeID"></input>
<p class="help-block">The node ID can be found in the logs or in the "Add Node" dialog on the other node.</p>
<input ng-if="!editingExisting" id="nodeID" class="form-control" type="text" ng-model="currentNode.NodeID"></input>
<div ng-if="editingExisting" class="well well-sm">{{currentNode.NodeID}}</div>
<p class="help-block">The node ID can be found in the "Add Node" dialog on the other node.</p>
</div>
<div class="form-group">
<label for="name">Name</label>
@@ -413,9 +415,10 @@
<div class="form-group">
<div class="checkbox">
<label>
<input type="checkbox" ng-model="currentRepo.ReadOnly"> Read Only
<input type="checkbox" ng-model="currentRepo.ReadOnly"> Repository Master
</label>
</div>
<p class="help-block">Files are protected from changes made on other nodes, but changes made on <em>this</em> node will be sent to the rest of the cluster.</p>
</div>
<div class="form-group">
<label for="nodes">Nodes</label>

View File

File diff suppressed because one or more lines are too long

View File

@@ -9,6 +9,9 @@
<node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3">
<address>127.0.0.1:22003</address>
</node>
<node id="EJHMPAQOGCVORISB4IS3SYYVJXTKJGLTU66DIQPGJ5D2GXGQ3OWQ" name="s4">
<address>127.0.0.1:22004</address>
</node>
</repository>
<repository id="s12" directory="s12-1">
<node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1">

23
integration/h4/cert.pem Normal file
View File

@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID3jCCAkigAwIBAgIBADALBgkqhkiG9w0BAQswFDESMBAGA1UEAxMJc3luY3Ro
aW5nMB4XDTE0MDUxMDAwNTM0N1oXDTQ5MTIzMTIzNTk1OVowFDESMBAGA1UEAxMJ
c3luY3RoaW5nMIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA9MRyBtAr
Sjt29azNoCWxx5xZF3RodBcQu+wv5sRR8lWozrr4brfUJLslcQHowqaAprOU1NP+
BH12P5CSymsUrwAmCwSQ54CimXrNi5RiNMl7dtInJksk4Kp6nJgfyR7TqeQgqxtv
+skVWdJY7ptxqpVuDfkf1JnNr68dbANw8hEJpPaGm3qOt81YvSg37R75HiOCzv+h
FcSjKpPyFMvPARMCOHuZS0fYRJtI5nwmR0mWtKfnH/2204YNiQUne/8h2fgtkpxy
OjxKOs2KJxbmpV6Uur/YyGyinb5+Aa0df3KCBuZmE+i/AsZcTsk0fgefe+bshWG/
hzrNfV0wsX3TYjYOSBJ04+f/uQW00G1GGSxPwTsShGqVuwfJkTqkjAXX5wcH+PgJ
ewG/dyMzKklMg19Y65WkhpWa/19o2KSZNw6TO8YM1arwT0STcMc+4fdrVB09lX6q
NJA8UL8hUX+jbKBzatDY64h1d9E8PE0ODHYgYFO2Ko7e2GnWCQeijGmnAgMBAAGj
PzA9MA4GA1UdDwEB/wQEAwIAoDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUH
AwIwDAYDVR0TAQH/BAIwADALBgkqhkiG9w0BAQsDggGBANFiHcATP5Lm11o65wbh
sKk7yteTapRohMoLNdW44YNyM8ZkELnrdNY8pe3CWSGy3spBH01+4jbUT+gSltQr
KTLVxSZ7f91696Og5ag4BQCeFY6ghKD/G9+PlBSj6yb3Y98NZsx8huLfylH+XuJw
2gP5Nqov4uXaKgYylx2gdaeCb2M+wM/br1DO2HCPCmgbZE5g8RM5JxzojGn/41Le
IbCd39zdI6NKj9c7T1Bxmt20uzca4nRgXVVzJymedEoF+//sBRk6PQzqgjgn/r3S
h9vrqo5j8ly/+ojFjBaVY7gq2XHM6/q0LTjeKkv2MUQw+vEEZX65GpBOgBZ8U0Wb
/NMUUhhDjGE/0G6TCJgq/HdkjmsNaWjO5sWjhnwXNImYXBdH4OenhXIrHcLhcnxN
2n5sPkDc6n0LVVV7VAjBPXcTmu2uOSK02yqNZLLWJygp1Wl6lbiqLS3bJgYrUv2m
YkRaR+IqVPw5EPs/QlH0qLBeCyIasaSWUVZeitVwRmqIUA==
-----END CERTIFICATE-----

36
integration/h4/config.xml Normal file
View File

@@ -0,0 +1,36 @@
<configuration version="2">
<repository id="unique" directory="s4" ro="false">
<node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1"></node>
<node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2"></node>
<node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3"></node>
<node id="EJHMPAQOGCVORISB4IS3SYYVJXTKJGLTU66DIQPGJ5D2GXGQ3OWQ" name="s4"></node>
</repository>
<node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1">
<address>127.0.0.1:22001</address>
</node>
<node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2">
<address>127.0.0.1:22002</address>
</node>
<node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3">
<address>127.0.0.1:22003</address>
</node>
<node id="EJHMPAQOGCVORISB4IS3SYYVJXTKJGLTU66DIQPGJ5D2GXGQ3OWQ" name="s4">
<address>dynamic</address>
</node>
<gui enabled="true">
<address>127.0.0.1:8084</address>
</gui>
<options>
<listenAddress>:22004</listenAddress>
<globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer>
<globalAnnounceEnabled>false</globalAnnounceEnabled>
<localAnnounceEnabled>true</localAnnounceEnabled>
<parallelRequests>16</parallelRequests>
<maxSendKbps>0</maxSendKbps>
<rescanIntervalS>60</rescanIntervalS>
<reconnectionIntervalS>10</reconnectionIntervalS>
<maxChangeKbps>1000</maxChangeKbps>
<startBrowser>false</startBrowser>
<upnpEnabled>false</upnpEnabled>
</options>
</configuration>

39
integration/h4/key.pem Normal file
View File

@@ -0,0 +1,39 @@
-----BEGIN RSA PRIVATE KEY-----
MIIG5AIBAAKCAYEA9MRyBtArSjt29azNoCWxx5xZF3RodBcQu+wv5sRR8lWozrr4
brfUJLslcQHowqaAprOU1NP+BH12P5CSymsUrwAmCwSQ54CimXrNi5RiNMl7dtIn
Jksk4Kp6nJgfyR7TqeQgqxtv+skVWdJY7ptxqpVuDfkf1JnNr68dbANw8hEJpPaG
m3qOt81YvSg37R75HiOCzv+hFcSjKpPyFMvPARMCOHuZS0fYRJtI5nwmR0mWtKfn
H/2204YNiQUne/8h2fgtkpxyOjxKOs2KJxbmpV6Uur/YyGyinb5+Aa0df3KCBuZm
E+i/AsZcTsk0fgefe+bshWG/hzrNfV0wsX3TYjYOSBJ04+f/uQW00G1GGSxPwTsS
hGqVuwfJkTqkjAXX5wcH+PgJewG/dyMzKklMg19Y65WkhpWa/19o2KSZNw6TO8YM
1arwT0STcMc+4fdrVB09lX6qNJA8UL8hUX+jbKBzatDY64h1d9E8PE0ODHYgYFO2
Ko7e2GnWCQeijGmnAgMBAAECggGBAIjKaLdqC2d3CCqQonJH3q0hsaCsC9wlL9L2
UmbzfKCkQq0WTNUDo2nLtUcMvBpclzWS0zCGMUYtH7Kyh3bclTigKqKpsJnQiA6i
VNEW4jOCDp//HqYGBNwSKmftlIX/1mbx+VfnA5PyYR5LsivXb5TX4iOpAKL+Obdf
dF/zJGIEJ5GrvNqTicMq3dcI7Qh18N9pFSe+MTZLKK0Y9Yetx0hgaTNL0AYEZtcg
uYMmCvZ4J+Namo6EanKYTmQvHzvq/tZVMvud9Gcr6uKKtVBcgex9S/R7IicaKg78
oDTgH0nDrpI55pZCX8vuVGk8nVTXXLTsMR1XojOpiYjS6ucfTkPEw3fOW/YRhHg5
93TrdDiWkqSWube5LNUF87q65t/aw/y2EH2aTNqcPD5OQ+EZRS8OGYPqOrJ4Ycbp
j6CMSE+LX2IDMQyJ+9J0vPHtFsAviBKQkPoQ1L6mvhJuw6ksy34NQGykNDHz7nQK
SeqvCJ6XCtaWNkq+00lC3UFaGsjuUQKBwQD8+y370co5G7G5GDLbLE3i+pguUN7L
5YfDj5qqsM9hOJNqeKAHrKFP2ii0F9WxGw/ruY0k8k7zUt6LepgwkCI5BYfckRKJ
g8YsNTizjqPLRGtiqL9Garjo+xPxFGj+TkTg9fYD4xTWFa1I15zzCu7Ye7xObeEH
LRtcm3R4fU54JDrKtKDccoQmTEAzsxRdNXi9ifc7qgjGBH9W02guuGPY4ltT1aZR
bcO5vpi44Fnl2h6d7N6iwCtFJ0CaT1pAZ4UCgcEA97Asf5DTDWKByZBhk+VvuT1b
6nMYjqKxDNMmCaomCmk8Mif0w9SEJmAg0b/gbs/H6T78a+9WjbN5q9xHcDU91uax
TdCenTq7H981AjgUG7OA7XwYn+AKy+hGSnsTJglMJzJm6TGt+Sq0oO9EahBRDlsP
PiQRot2gyQfubwcl3rhdErRwaCM92BUyPkC2fy2OppAeZOOxxuzxrvHflDOuDGCZ
KPCmy6U9HV0JOAO2FSNJeZdNLBixXa1Pk8TgbLY7AoHBAPG7lhn9Qg3Fz9H9NINH
13jfWdFQB0SwJEWTEAiwgMj2ha6Eau5KX63s2V4VNGVSZakqmZtHSneppOuEjq5A
2+K+zS7PFPaACzos9OxmjU7rJu2UL4m66sv9NvXzOcxev+RyQs0+DKfw+K8VEG0Q
8l+8BJiw2AjCalXYWbfUjMmyXNdbOCbN6kaqL+L26KuUL7Z1gd/qPw3wODmgMvoJ
yabxzLDUA2PlzdPMMyTdhCllfkILmEXN+MrQkiOhVa0a/QKBwGZjAhH9ePD4fnQm
5d8wIb3uGlfRGh6kLBIEGp42IqF9HPASykBFUhdW91odOhY0eAv4CHpJpnrO7QXY
+gLtT1HNbQ+gpGCUTZQAPbZcHhvRWQNSoA8+mtftfVj+hUzc3Qj68cWFzsfIGoDI
R3ycoBUSGTvzxwKPIQ7Y43wr9UCa74Zy5mB16POw12MadxYda/F4c8f6w5taiRFr
VKO7tT/Skp101U4rURcZRV1NU3BrdMz5eWI4FuGFafbIlIj7zwKBwHCt3VQt+JmZ
OhCJR+8Q+jT0JvnMu1zi4CcMRiT8FbNdZDY/3B0wG4ySTNrEikFzIjihF4zIp2nv
nD3qKQs+THl51GA8AnP9bNk7hknD7rXUuScndccTW58+PGrjqfwJp/1MEeOJQpoX
0JML1w+dIKHzsKN0X6UL7Gyq8m+0SJKmQQguan3d3M8CMpnW0srgqOfJ+q1+bz8b
6FuJeijoaN8+zyKkN+9R91Erw5pk+7vJRzEpDtkhprEE5tLNDKrXJw==
-----END RSA PRIVATE KEY-----

View File

@@ -14,7 +14,7 @@ go build json.go
start() {
echo "Starting..."
for i in 1 2 3 ; do
for i in 1 2 3 4 ; do
STPROFILER=":909$i" syncthing -home "h$i" &
done
}

View File

@@ -45,6 +45,9 @@ func (b *Beacon) run() {
if err != nil {
log.Fatal(err)
}
if debug {
dlog.Printf("trying %d interfaces", len(intfs))
}
for _, intf := range intfs {
intf := intf
@@ -55,10 +58,13 @@ func (b *Beacon) run() {
conn, err := net.ListenMulticastUDP("udp4", &intf, group)
if err != nil {
if debug {
dlog.Printf("listen for multicast group on %q: %v", intf.Name, err)
dlog.Printf("failed to listen for multicast group on %q: %v", intf.Name, err)
}
} else {
b.conns = append(b.conns, conn)
if debug {
dlog.Printf("listening for multicast group on %q", intf.Name)
}
}
}
@@ -72,6 +78,9 @@ func (b *Beacon) run() {
dlog.Println(err)
return
}
if debug {
dlog.Printf("recv %d bytes from %s on %v", n, addr, conn)
}
b.outbox <- recv{bs[:n], addr}
}
}()
@@ -85,6 +94,9 @@ func (b *Beacon) run() {
dlog.Println(err)
return
}
if debug {
dlog.Printf("sent %d bytes to %s on %v", len(bs), group, conn)
}
}
}
}()

View File

@@ -9,7 +9,6 @@ import (
"sync"
"time"
"github.com/calmh/syncthing/buffers"
"github.com/calmh/syncthing/xdr"
)
@@ -64,24 +63,26 @@ type Connection interface {
}
type rawConnection struct {
sync.RWMutex
id string
receiver Model
reader io.ReadCloser
cr *countingReader
xr *xdr.Reader
writer io.WriteCloser
cw *countingWriter
wb *bufio.Writer
xw *xdr.Writer
wmut sync.Mutex
id string
receiver Model
reader io.ReadCloser
cr *countingReader
xr *xdr.Reader
writer io.WriteCloser
cw *countingWriter
wb *bufio.Writer
xw *xdr.Writer
closed chan struct{}
awaiting map[int]chan asyncResult
nextID int
indexSent map[string]map[string][2]int64
awaiting []chan asyncResult
imut sync.Mutex
hasSentIndex bool
hasRecvdIndex bool
nextID chan int
outbox chan []encodable
closed chan struct{}
}
type asyncResult struct {
@@ -90,7 +91,7 @@ type asyncResult struct {
}
const (
pingTimeout = 2 * time.Minute
pingTimeout = 4 * time.Minute
pingIdleTime = 5 * time.Minute
)
@@ -115,13 +116,17 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
cw: cw,
wb: wb,
xw: xdr.NewWriter(wb),
closed: make(chan struct{}),
awaiting: make(map[int]chan asyncResult),
awaiting: make([]chan asyncResult, 0x1000),
indexSent: make(map[string]map[string][2]int64),
outbox: make(chan []encodable),
nextID: make(chan int),
closed: make(chan struct{}),
}
go c.readerLoop()
go c.writerLoop()
go c.pingerLoop()
go c.idGenerator()
return wireFormatConnection{&c}
}
@@ -132,11 +137,7 @@ func (c *rawConnection) ID() string {
// Index writes the list of file information to the connected peer node
func (c *rawConnection) Index(repo string, idx []FileInfo) {
c.Lock()
if c.isClosed() {
c.Unlock()
return
}
c.imut.Lock()
var msgType int
if c.indexSent[repo] == nil {
// This is the first time we send an index.
@@ -158,46 +159,33 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) {
}
idx = diff
}
c.imut.Unlock()
header{0, c.nextID, msgType}.encodeXDR(c.xw)
_, err := IndexMessage{repo, idx}.encodeXDR(c.xw)
if err == nil {
err = c.flush()
}
c.nextID = (c.nextID + 1) & 0xfff
c.hasSentIndex = true
c.Unlock()
if err != nil {
c.close(err)
return
}
c.send(header{0, -1, msgType}, IndexMessage{repo, idx})
}
// Request returns the bytes for the specified block after fetching them from the connected peer.
func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
c.Lock()
if c.isClosed() {
c.Unlock()
var id int
select {
case id = <-c.nextID:
case <-c.closed:
return nil, ErrClosed
}
rc := make(chan asyncResult)
if _, ok := c.awaiting[c.nextID]; ok {
c.imut.Lock()
if ch := c.awaiting[id]; ch != nil {
panic("id taken")
}
c.awaiting[c.nextID] = rc
header{0, c.nextID, messageTypeRequest}.encodeXDR(c.xw)
_, err := RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw)
if err == nil {
err = c.flush()
rc := make(chan asyncResult)
c.awaiting[id] = rc
c.imut.Unlock()
ok := c.send(header{0, id, messageTypeRequest},
RequestMessage{repo, name, uint64(offset), uint32(size)})
if !ok {
return nil, ErrClosed
}
if err != nil {
c.Unlock()
c.close(err)
return nil, err
}
c.nextID = (c.nextID + 1) & 0xfff
c.Unlock()
res, ok := <-rc
if !ok {
@@ -208,225 +196,276 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
// ClusterConfig send the cluster configuration message to the peer and returns any error
func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
c.Lock()
defer c.Unlock()
if c.isClosed() {
return
}
header{0, c.nextID, messageTypeClusterConfig}.encodeXDR(c.xw)
c.nextID = (c.nextID + 1) & 0xfff
_, err := config.encodeXDR(c.xw)
if err == nil {
err = c.flush()
}
if err != nil {
c.close(err)
}
c.send(header{0, -1, messageTypeClusterConfig}, config)
}
func (c *rawConnection) ping() bool {
c.Lock()
if c.isClosed() {
c.Unlock()
var id int
select {
case id = <-c.nextID:
case <-c.closed:
return false
}
rc := make(chan asyncResult, 1)
c.awaiting[c.nextID] = rc
header{0, c.nextID, messageTypePing}.encodeXDR(c.xw)
err := c.flush()
if err != nil {
c.Unlock()
c.close(err)
return false
} else if c.xw.Error() != nil {
c.Unlock()
c.close(c.xw.Error())
c.imut.Lock()
c.awaiting[id] = rc
c.imut.Unlock()
ok := c.send(header{0, id, messageTypePing})
if !ok {
return false
}
c.nextID = (c.nextID + 1) & 0xfff
c.Unlock()
res, ok := <-rc
return ok && res.err == nil
}
func (c *rawConnection) readerLoop() (err error) {
defer func() {
c.close(err)
}()
for {
select {
case <-c.closed:
return ErrClosed
default:
}
var hdr header
hdr.decodeXDR(c.xr)
if err := c.xr.Error(); err != nil {
return err
}
if hdr.version != 0 {
return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)
}
switch hdr.msgType {
case messageTypeIndex:
if err := c.handleIndex(); err != nil {
return err
}
case messageTypeIndexUpdate:
if err := c.handleIndexUpdate(); err != nil {
return err
}
case messageTypeRequest:
if err := c.handleRequest(hdr); err != nil {
return err
}
case messageTypeResponse:
if err := c.handleResponse(hdr); err != nil {
return err
}
case messageTypePing:
c.send(header{0, hdr.msgID, messageTypePong})
case messageTypePong:
c.handlePong(hdr)
case messageTypeClusterConfig:
if err := c.handleClusterConfig(); err != nil {
return err
}
default:
return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
}
}
}
func (c *rawConnection) handleIndex() error {
var im IndexMessage
im.decodeXDR(c.xr)
if err := c.xr.Error(); err != nil {
return err
} else {
// We run this (and the corresponding one for update, below)
// in a separate goroutine to avoid blocking the read loop.
// There is otherwise a potential deadlock where both sides
// has the model locked because it's sending a large index
// update and can't receive the large index update from the
// other side.
go c.receiver.Index(c.id, im.Repository, im.Files)
}
return nil
}
func (c *rawConnection) handleIndexUpdate() error {
var im IndexMessage
im.decodeXDR(c.xr)
if err := c.xr.Error(); err != nil {
return err
} else {
go c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
}
return nil
}
func (c *rawConnection) handleRequest(hdr header) error {
var req RequestMessage
req.decodeXDR(c.xr)
if err := c.xr.Error(); err != nil {
return err
}
go c.processRequest(hdr.msgID, req)
return nil
}
func (c *rawConnection) handleResponse(hdr header) error {
data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
if err := c.xr.Error(); err != nil {
return err
}
go func(hdr header, err error) {
c.imut.Lock()
rc := c.awaiting[hdr.msgID]
c.awaiting[hdr.msgID] = nil
c.imut.Unlock()
if rc != nil {
rc <- asyncResult{data, err}
close(rc)
}
}(hdr, c.xr.Error())
return nil
}
func (c *rawConnection) handlePong(hdr header) {
c.imut.Lock()
if rc := c.awaiting[hdr.msgID]; rc != nil {
go func() {
rc <- asyncResult{}
close(rc)
}()
c.awaiting[hdr.msgID] = nil
}
c.imut.Unlock()
}
func (c *rawConnection) handleClusterConfig() error {
var cm ClusterConfigMessage
cm.decodeXDR(c.xr)
if err := c.xr.Error(); err != nil {
return err
} else {
go c.receiver.ClusterConfig(c.id, cm)
}
return nil
}
type encodable interface {
encodeXDR(*xdr.Writer) (int, error)
}
type encodableBytes []byte
func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) {
return xw.WriteBytes(e)
}
func (c *rawConnection) send(h header, es ...encodable) bool {
if h.msgID < 0 {
select {
case id := <-c.nextID:
h.msgID = id
case <-c.closed:
return false
}
}
msg := append([]encodable{h}, es...)
select {
case c.outbox <- msg:
return true
case <-c.closed:
return false
}
}
func (c *rawConnection) writerLoop() {
var err error
for es := range c.outbox {
c.wmut.Lock()
for _, e := range es {
e.encodeXDR(c.xw)
}
if err = c.flush(); err != nil {
c.wmut.Unlock()
c.close(err)
return
}
c.wmut.Unlock()
}
}
type flusher interface {
Flush() error
}
func (c *rawConnection) flush() error {
c.wb.Flush()
if err := c.xw.Error(); err != nil {
return err
}
if err := c.wb.Flush(); err != nil {
return err
}
if f, ok := c.writer.(flusher); ok {
return f.Flush()
}
return nil
}
func (c *rawConnection) close(err error) {
c.Lock()
c.imut.Lock()
c.wmut.Lock()
defer c.imut.Unlock()
defer c.wmut.Unlock()
select {
case <-c.closed:
c.Unlock()
return
default:
}
close(c.closed)
for _, ch := range c.awaiting {
close(ch)
}
c.awaiting = nil
c.writer.Close()
c.reader.Close()
c.Unlock()
close(c.closed)
c.receiver.Close(c.id, err)
}
for i, ch := range c.awaiting {
if ch != nil {
close(ch)
c.awaiting[i] = nil
}
}
func (c *rawConnection) isClosed() bool {
select {
case <-c.closed:
return true
default:
return false
c.writer.Close()
c.reader.Close()
c.receiver.Close(c.id, err)
}
}
func (c *rawConnection) readerLoop() {
loop:
for !c.isClosed() {
var hdr header
hdr.decodeXDR(c.xr)
if c.xr.Error() != nil {
c.close(c.xr.Error())
break loop
func (c *rawConnection) idGenerator() {
nextID := 0
for {
nextID = (nextID + 1) & 0xfff
select {
case c.nextID <- nextID:
case <-c.closed:
return
}
if hdr.version != 0 {
c.close(fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version))
break loop
}
switch hdr.msgType {
case messageTypeIndex:
var im IndexMessage
im.decodeXDR(c.xr)
if c.xr.Error() != nil {
c.close(c.xr.Error())
break loop
} else {
// We run this (and the corresponding one for update, below)
// in a separate goroutine to avoid blocking the read loop.
// There is otherwise a potential deadlock where both sides
// has the model locked because it's sending a large index
// update and can't receive the large index update from the
// other side.
go c.receiver.Index(c.id, im.Repository, im.Files)
}
c.Lock()
c.hasRecvdIndex = true
c.Unlock()
case messageTypeIndexUpdate:
var im IndexMessage
im.decodeXDR(c.xr)
if c.xr.Error() != nil {
c.close(c.xr.Error())
break loop
} else {
go c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
}
case messageTypeRequest:
var req RequestMessage
req.decodeXDR(c.xr)
if c.xr.Error() != nil {
c.close(c.xr.Error())
break loop
}
go c.processRequest(hdr.msgID, req)
case messageTypeResponse:
data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
if c.xr.Error() != nil {
c.close(c.xr.Error())
break loop
}
go func(hdr header, err error) {
c.Lock()
rc, ok := c.awaiting[hdr.msgID]
delete(c.awaiting, hdr.msgID)
c.Unlock()
if ok {
rc <- asyncResult{data, err}
close(rc)
}
}(hdr, c.xr.Error())
case messageTypePing:
c.Lock()
header{0, hdr.msgID, messageTypePong}.encodeXDR(c.xw)
err := c.flush()
c.Unlock()
if err != nil {
c.close(err)
break loop
} else if c.xw.Error() != nil {
c.close(c.xw.Error())
break loop
}
case messageTypePong:
c.RLock()
rc, ok := c.awaiting[hdr.msgID]
c.RUnlock()
if ok {
rc <- asyncResult{}
close(rc)
c.Lock()
delete(c.awaiting, hdr.msgID)
c.Unlock()
}
case messageTypeClusterConfig:
var cm ClusterConfigMessage
cm.decodeXDR(c.xr)
if c.xr.Error() != nil {
c.close(c.xr.Error())
break loop
} else {
go c.receiver.ClusterConfig(c.id, cm)
}
default:
c.close(fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType))
break loop
}
}
}
func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
c.Lock()
header{0, msgID, messageTypeResponse}.encodeXDR(c.xw)
_, err := c.xw.WriteBytes(data)
if err == nil {
err = c.flush()
}
c.Unlock()
buffers.Put(data)
if err != nil {
c.close(err)
}
}
@@ -436,29 +475,33 @@ func (c *rawConnection) pingerLoop() {
for {
select {
case <-ticker:
c.RLock()
ready := c.hasRecvdIndex && c.hasSentIndex
c.RUnlock()
if ready {
go func() {
rc <- c.ping()
}()
select {
case ok := <-rc:
if !ok {
c.close(fmt.Errorf("ping failure"))
}
case <-time.After(pingTimeout):
c.close(fmt.Errorf("ping timeout"))
go func() {
rc <- c.ping()
}()
select {
case ok := <-rc:
if !ok {
c.close(fmt.Errorf("ping failure"))
}
case <-time.After(pingTimeout):
c.close(fmt.Errorf("ping timeout"))
case <-c.closed:
return
}
case <-c.closed:
return
}
}
}
func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
c.send(header{0, msgID, messageTypeResponse},
encodableBytes(data))
}
type Statistics struct {
At time.Time
InBytesTotal int

View File

@@ -174,9 +174,7 @@ func TestClose(t *testing.T) {
c0.close(nil)
if !c0.isClosed() {
t.Fatal("Connection should be closed")
}
<-c0.closed
if !m0.isClosed() {
t.Fatal("Connection should be closed")
}

View File

@@ -2,6 +2,7 @@ package scanner
import (
"bytes"
"errors"
"io/ioutil"
"log"
"os"
@@ -51,12 +52,18 @@ type CurrentFiler interface {
// Walk returns the list of files found in the local repository by scanning the
// file system. Files are blockwise hashed.
func (w *Walker) Walk() (files []File, ignore map[string][]string) {
func (w *Walker) Walk() (files []File, ignore map[string][]string, err error) {
w.lazyInit()
if debug {
dlog.Println("Walk", w.Dir, w.BlockSize, w.IgnoreFile)
}
err = checkDir(w.Dir)
if err != nil {
return
}
t0 := time.Now()
ignore = make(map[string][]string)
@@ -70,6 +77,8 @@ func (w *Walker) Walk() (files []File, ignore map[string][]string) {
d := t1.Sub(t0).Seconds()
dlog.Printf("Walk in %.02f ms, %.0f files/s", d*1000, float64(len(files))/d)
}
err = checkDir(w.Dir)
return
}
@@ -272,3 +281,12 @@ func (w *Walker) ignoreFile(patterns map[string][]string, file string) bool {
}
return false
}
func checkDir(dir string) error {
if info, err := os.Stat(dir); err != nil {
return err
} else if !info.IsDir() {
return errors.New(dir + ": not a directory")
}
return nil
}

View File

@@ -27,7 +27,11 @@ func TestWalk(t *testing.T) {
BlockSize: 128 * 1024,
IgnoreFile: ".stignore",
}
files, ignores := w.Walk()
files, ignores, err := w.Walk()
if err != nil {
t.Fatal(err)
}
if l1, l2 := len(files), len(testdata); l1 != l2 {
t.Fatalf("Incorrect number of walked files %d != %d", l1, l2)
@@ -54,6 +58,30 @@ func TestWalk(t *testing.T) {
}
}
func TestWalkError(t *testing.T) {
w := Walker{
Dir: "testdata-missing",
BlockSize: 128 * 1024,
IgnoreFile: ".stignore",
}
_, _, err := w.Walk()
if err == nil {
t.Error("no error from missing directory")
}
w = Walker{
Dir: "testdata/bar",
BlockSize: 128 * 1024,
IgnoreFile: ".stignore",
}
_, _, err = w.Walk()
if err == nil {
t.Error("no error from non-directory")
}
}
func TestIgnore(t *testing.T) {
var patterns = map[string][]string{
"": {"t2"},