Compare commits

...

23 Commits

Author SHA1 Message Date
Jakob Borg
2d0600de38 Merge pull request #78 from filoozom/patch-1
Update config.go to handle default slices correctly (fixes #76)
2014-03-04 15:09:47 +01:00
filoozom
6a1c055288 Delete comment in config.go 2014-03-04 15:02:26 +01:00
filoozom
b9ec30ebdb Update config.go 2014-03-04 11:29:51 +01:00
filoozom
428164f395 Update config.go to handle default slices correctly 2014-03-04 11:25:10 +01:00
Jakob Borg
ba59e0d3f0 Use undirected broadcast and WriteTo (fixes #75) 2014-03-03 18:19:32 +01:00
Jakob Borg
5d8f0f835e Merge branch 'filoozom-patch-1'
* filoozom-patch-1:
  Fix divided by zero when the sync folder is empty (tot = 0)
  Delete cfgFile before renaming it on Windows
  Set the right config and home dir for each OS
  Update getHomeDir() to use "os/user"
2014-03-03 14:06:15 +01:00
filoozom
b4a1aadd1b Fix divided by zero when the sync folder is empty (tot = 0) 2014-03-03 08:47:52 +01:00
filoozom
8f41d90ab1 Delete cfgFile before renaming it on Windows 2014-03-03 08:46:20 +01:00
Jakob Borg
9743386166 Re-add inadvertently ignored files 2014-03-02 23:58:24 +01:00
Jakob Borg
0afcb5b7e7 Clean up build.sh 2014-03-02 23:55:08 +01:00
filoozom
043dea760f Set the right config and home dir for each OS
Use %AppData%\syncthing for the config files and %USERPROFILE%\Sync as sync folder for Windows.
2014-03-02 23:49:51 +01:00
Jakob Borg
0618e2b9b4 Don't include timestamp in auto generated files 2014-03-02 23:15:56 +01:00
Jakob Borg
3c171d281c Move cmd files into subdir 2014-03-02 23:13:04 +01:00
Jakob Borg
c217b7cd22 Change default announce server to announce.syncthing.net 2014-03-02 17:29:35 +01:00
filoozom
23593c3d20 Update getHomeDir() to use "os/user"
os.Getenv("HOME") doesn't work properly on Windows (and maybe other systems?) and the package "os/user" gives us a convenient way to find the home directory for every OS.
2014-03-02 16:07:12 +01:00
Jakob Borg
192117dc11 Merge branch 'v0.6'
* v0.6:
  Open GUI on startup
2014-03-02 13:08:05 +01:00
Jakob Borg
24b8f9211a Open GUI on startup 2014-03-02 12:52:32 +01:00
Jakob Borg
51788d6f0e Add some support packages 2014-03-01 11:11:37 +01:00
Jakob Borg
ea0bed2238 drone.io badge 2014-02-24 14:06:22 +01:00
Jakob Borg
e2fe57c440 deadcode 2014-02-24 13:34:24 +01:00
Jakob Borg
434a0ccf2a golint 2014-02-24 13:29:30 +01:00
Jakob Borg
e7bf3ac108 go vet 2014-02-24 13:24:03 +01:00
Jakob Borg
c5bdaebf2b Remove spurious debug print 2014-02-23 15:08:15 +01:00
42 changed files with 12530 additions and 11986 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,4 @@
syncthing
syncthing.exe
*.tar.gz
dist
*.zip

View File

@@ -1,4 +1,4 @@
syncthing
syncthing [![Build Status](https://drone.io/github.com/calmh/syncthing/status.png)](https://drone.io/github.com/calmh/syncthing/latest)
=========
This is the `syncthing` project. The following are the project goals:

View File

@@ -18,7 +18,7 @@ cd gui
for f in $(find . -type f) ; do
f="${f#./}"
echo "gr, _ = gzip.NewReader(bytes.NewBuffer([]byte{"
gzip -c $f | od -vt x1 | sed 's/^[0-9a-f]*//' | sed 's/\([0-9a-f][0-9a-f]\)/0x\1,/g'
gzip -n -c $f | od -vt x1 | sed 's/^[0-9a-f]*//' | sed 's/\([0-9a-f][0-9a-f]\)/0x\1,/g'
echo "}))"
echo "data, _ = ioutil.ReadAll(gr)"
echo "Assets[\"$f\"] = data"

View File

File diff suppressed because it is too large Load Diff

118
build.sh
View File

@@ -2,48 +2,90 @@
export COPYFILE_DISABLE=true
distFiles=(README.md LICENSE) # apart from the binary itself
version=$(git describe --always)
buildDir=dist
if [[ $fast != yes ]] ; then
build() {
go build -ldflags "-w -X main.Version $version" ./cmd/syncthing
}
prepare() {
./assets.sh | gofmt > auto/gui.files.go
go get -d
}
test() {
go test ./...
fi
}
if [[ -z $1 ]] ; then
go build -ldflags "-X main.Version $version"
elif [[ $1 == "tar" ]] ; then
go build -ldflags "-X main.Version $version" \
&& mkdir syncthing-dist \
&& cp syncthing README.md LICENSE syncthing-dist \
&& tar zcvf syncthing-dist.tar.gz syncthing-dist \
&& rm -rf syncthing-dist
elif [[ $1 == "all" ]] ; then
rm -rf "$buildDir"
mkdir -p "$buildDir" || exit 1
tarDist() {
name="$1"
mkdir -p "$name"
cp syncthing "${distFiles[@]}" "$name"
tar zcvf "$name.tar.gz" "$name"
rm -rf "$name"
}
export GOARM=7
for os in darwin-amd64 linux-amd64 linux-arm freebsd-amd64 windows-amd64 ; do
echo "$os"
export name="syncthing-$os"
export GOOS=${os%-*}
export GOARCH=${os#*-}
go build -ldflags "-X main.Version $version"
mkdir -p "$name"
cp README.md LICENSE "$name"
case $GOOS in
windows)
cp syncthing.exe "$buildDir/$name.exe"
mv syncthing.exe "$name"
zip -qr "$buildDir/$name.zip" "$name"
;;
*)
cp syncthing "$buildDir/$name"
mv syncthing "$name"
tar zcf "$buildDir/$name.tar.gz" "$name"
;;
esac
rm -r "$name"
done
fi
zipDist() {
name="$1"
mkdir -p "$name"
cp syncthing.exe "${distFiles[@]}" "$name"
zip -r "$name.zip" "$name"
rm -rf "$name"
}
case "$1" in
"")
build
;;
tar)
rm -f *.tar.gz *.zip
prepare
test || exit 1
build
eval $(go env)
name="syncthing-$GOOS-$GOARCH-$version"
tarDist "$name"
;;
all)
rm -f *.tar.gz *.zip
prepare
test || exit 1
export GOARM=7
for os in darwin-amd64 linux-amd64 linux-arm freebsd-amd64 windows-amd64 ; do
export GOOS=${os%-*}
export GOARCH=${os#*-}
build
name="syncthing-$os-$version"
case $GOOS in
windows)
zipDist "$name"
rm -f syncthing.exe
;;
*)
tarDist "$name"
rm -f syncthing
;;
esac
done
;;
upload)
tag=$(git describe)
shopt -s nullglob
for f in *gz *zip ; do
relup calmh/syncthing "$tag" "$f"
done
;;
*)
echo "Unknown build parameter $1"
;;
esac

42
cid/cid.go Normal file
View File

@@ -0,0 +1,42 @@
package cid
type Map struct {
toCid map[string]int
toName []string
}
func NewMap() *Map {
return &Map{
toCid: make(map[string]int),
}
}
func (m *Map) Get(name string) int {
cid, ok := m.toCid[name]
if ok {
return cid
}
// Find a free slot to get a new ID
for i, n := range m.toName {
if n == "" {
m.toName[i] = name
m.toCid[name] = i
return i
}
}
// Add it to the end since we didn't find a free slot
m.toName = append(m.toName, name)
cid = len(m.toName) - 1
m.toCid[name] = cid
return cid
}
func (m *Map) Clear(name string) {
cid, ok := m.toCid[name]
if ok {
m.toName[cid] = ""
delete(m.toCid, name)
}
}

1
cmd/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
!syncthing

View File

@@ -17,7 +17,7 @@ func Blocks(r io.Reader, blocksize int) ([]Block, error) {
var blocks []Block
var offset int64
for {
lr := &io.LimitedReader{r, int64(blocksize)}
lr := &io.LimitedReader{R: r, N: int64(blocksize)}
hf := sha256.New()
n, err := io.Copy(hf, lr)
if err != nil {

View File

@@ -36,7 +36,7 @@ type OptionsConfiguration struct {
FollowSymlinks bool `xml:"followSymlinks" default:"true" ini:"follow-symlinks"`
GUIEnabled bool `xml:"guiEnabled" default:"true" ini:"gui-enabled"`
GUIAddress string `xml:"guiAddress" default:"127.0.0.1:8080" ini:"gui-address"`
GlobalAnnServer string `xml:"globalAnnounceServer" default:"syncthing.nym.se:22025" ini:"global-announce-server"`
GlobalAnnServer string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22025" ini:"global-announce-server"`
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true" ini:"global-announce-enabled"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true" ini:"local-announce-enabled"`
ParallelRequests int `xml:"parallelRequests" default:"16" ini:"parallel-requests"`
@@ -46,7 +46,7 @@ type OptionsConfiguration struct {
MaxChangeKbps int `xml:"maxChangeKbps" default:"1000" ini:"max-change-bw"`
}
func setDefaults(data interface{}) error {
func setDefaults(data interface{}, setEmptySlices bool) error {
s := reflect.ValueOf(data).Elem()
t := s.Type()
@@ -56,14 +56,20 @@ func setDefaults(data interface{}) error {
v := tag.Get("default")
if len(v) > 0 {
if f.Kind().String() == "slice" && f.Len() != 0 {
continue
}
switch f.Interface().(type) {
case string:
f.SetString(v)
case []string:
rv := reflect.MakeSlice(reflect.TypeOf([]string{}), 1, 1)
rv.Index(0).SetString(v)
f.Set(rv)
if setEmptySlices {
rv := reflect.MakeSlice(reflect.TypeOf([]string{}), 1, 1)
rv.Index(0).SetString(v)
f.Set(rv)
}
case int:
i, err := strconv.ParseInt(v, 10, 64)
@@ -146,14 +152,16 @@ func uniqueStrings(ss []string) []string {
func readConfigXML(rd io.Reader) (Configuration, error) {
var cfg Configuration
setDefaults(&cfg)
setDefaults(&cfg.Options)
setDefaults(&cfg, false)
setDefaults(&cfg.Options, false)
var err error
if rd != nil {
err = xml.NewDecoder(rd).Decode(&cfg)
}
setDefaults(&cfg.Options, true)
cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress)
return cfg, err
}

View File

View File

@@ -272,7 +272,7 @@ func TestFileQueueThreadHandling(t *testing.T) {
close(start)
wg.Wait()
if int(gotTot) != total {
t.Error("Total mismatch; %d != %d", gotTot, total)
t.Errorf("Total mismatch; %d != %d", gotTot, total)
}
}
@@ -283,13 +283,13 @@ func TestDeleteAt(t *testing.T) {
q.files = queuedFileList{{name: "a"}, {name: "b"}, {name: "c"}, {name: "d"}}
q.deleteAt(i)
if l := len(q.files); l != 3 {
t.Fatal("deleteAt(%d) failed; %d != 3", i, l)
t.Fatalf("deleteAt(%d) failed; %d != 3", i, l)
}
}
q.files = queuedFileList{{name: "a"}}
q.deleteAt(0)
if l := len(q.files); l != 0 {
t.Fatal("deleteAt(only) failed; %d != 0", l)
t.Fatalf("deleteAt(only) failed; %d != 0", l)
}
}

View File

View File

View File

View File

View File

@@ -24,11 +24,10 @@ import (
)
var cfg Configuration
var Version string = "unknown-dev"
var Version = "unknown-dev"
var (
myID string
config ini.Config
myID string
)
var (
@@ -41,7 +40,7 @@ var (
)
func main() {
flag.StringVar(&confDir, "home", "~/.syncthing", "Set configuration directory")
flag.StringVar(&confDir, "home", getDefaultConfDir(), "Set configuration directory")
flag.StringVar(&trace, "debug.trace", "", "(connect,net,idx,file,pull)")
flag.StringVar(&profiler, "debug.profiler", "", "(addr)")
flag.BoolVar(&showVersion, "version", false, "Show version")
@@ -83,7 +82,7 @@ func main() {
fatalErr(err)
}
myID = string(certId(cert.Certificate[0]))
myID = string(certID(cert.Certificate[0]))
log.SetPrefix("[" + myID[0:5] + "] ")
logger.SetPrefix("[" + myID[0:5] + "] ")
@@ -139,7 +138,7 @@ func main() {
cfg, err = readConfigXML(nil)
cfg.Repositories = []RepositoryConfiguration{
{
Directory: "~/Sync",
Directory: path.Join(getHomeDir(), "Sync"),
Nodes: []NodeConfiguration{
{NodeID: myID, Addresses: []string{"dynamic"}},
},
@@ -188,16 +187,26 @@ func main() {
// GUI
if cfg.Options.GUIEnabled && cfg.Options.GUIAddress != "" {
host, port, err := net.SplitHostPort(cfg.Options.GUIAddress)
addr, err := net.ResolveTCPAddr("tcp", cfg.Options.GUIAddress)
if err != nil {
warnf("Cannot start GUI on %q: %v", cfg.Options.GUIAddress, err)
} else {
if len(host) > 0 {
infof("Starting web GUI on http://%s", cfg.Options.GUIAddress)
} else {
infof("Starting web GUI on port %s", port)
var hostOpen, hostShow string
switch {
case addr.IP == nil:
hostOpen = "localhost"
hostShow = "0.0.0.0"
case addr.IP.IsUnspecified():
hostOpen = "localhost"
hostShow = addr.IP.String()
default:
hostOpen = addr.IP.String()
hostShow = hostOpen
}
infof("Starting web GUI on http://%s:%d/", hostShow, addr.Port)
startGUI(cfg.Options.GUIAddress, m)
openURL(fmt.Sprintf("http://%s:%d", hostOpen, addr.Port))
}
}
@@ -319,6 +328,13 @@ func saveConfigLoop(cfgFile string) {
continue
}
if runtime.GOOS == "windows" {
err := os.Remove(cfgFile)
if err != nil && !os.IsNotExist(err) {
warnln(err)
}
}
err = os.Rename(cfgFile+".tmp", cfgFile)
if err != nil {
warnln(err)
@@ -388,7 +404,7 @@ listen:
continue
}
remoteID := certId(tc.ConnectionState().PeerCertificates[0].Raw)
remoteID := certID(tc.ConnectionState().PeerCertificates[0].Raw)
if remoteID == myID {
warnf("Connect from myself (%s) - should not happen", remoteID)
@@ -469,7 +485,7 @@ func connect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Confi
continue
}
remoteID := certId(conn.ConnectionState().PeerCertificates[0].Raw)
remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw)
if remoteID != nodeCfg.NodeID {
warnln("Unexpected nodeID", remoteID, "!=", nodeCfg.NodeID)
conn.Close()
@@ -502,7 +518,10 @@ func saveIndex(m *Model) {
gzw := gzip.NewWriter(idxf)
protocol.IndexMessage{"local", m.ProtocolIndex()}.EncodeXDR(gzw)
protocol.IndexMessage{
Repository: "local",
Files: m.ProtocolIndex(),
}.EncodeXDR(gzw)
gzw.Close()
idxf.Close()
os.Rename(fullName+".tmp", fullName)
@@ -542,16 +561,38 @@ func ensureDir(dir string, mode int) {
}
func expandTilde(p string) string {
if runtime.GOOS == "windows" {
return p
}
if strings.HasPrefix(p, "~/") {
return strings.Replace(p, "~", getHomeDir(), 1)
return strings.Replace(p, "~", getUnixHomeDir(), 1)
}
return p
}
func getHomeDir() string {
func getUnixHomeDir() string {
home := os.Getenv("HOME")
if home == "" {
fatalln("No home directory?")
}
return home
}
func getHomeDir() string {
if runtime.GOOS == "windows" {
home := os.Getenv("HOMEDRIVE") + os.Getenv("HOMEPATH")
if home == "" {
home = os.Getenv("USERPROFILE")
}
return home
}
return getUnixHomeDir()
}
func getDefaultConfDir() string {
if runtime.GOOS == "windows" {
return path.Join(os.Getenv("AppData"), "syncthing")
}
return expandTilde("~/.syncthing")
}

View File

@@ -64,9 +64,6 @@ type Connection interface {
const (
idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
minFileHoldTimeS = 60 // Never allow file changes more often than this
maxFileHoldTimeS = 600 // Always allow file changes at least this often
)
var (
@@ -194,7 +191,10 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
}
}
ci.Completion = int(100 * have / tot)
ci.Completion = 100
if tot != 0 {
ci.Completion = int(100 * have / tot)
}
res[node] = ci
}

34
cmd/syncthing/openurl.go Normal file
View File

@@ -0,0 +1,34 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"os/exec"
"runtime"
)
func openURL(url string) error {
if runtime.GOOS == "windows" {
return exec.Command("cmd.exe", "/C", "start "+url).Run()
}
if runtime.GOOS == "darwin" {
return exec.Command("open", url).Run()
}
return exec.Command("xdg-open", url).Run()
}

View File

@@ -6,7 +6,7 @@ import (
)
const (
MAX_CHANGE_HISTORY = 4
MaxChangeHistory = 4
)
type change struct {
@@ -45,8 +45,8 @@ func (h changeHistory) bandwidth(t time.Time) int64 {
func (h *changeHistory) append(size int64, t time.Time) {
c := change{size, t}
if len(h.changes) == MAX_CHANGE_HISTORY {
h.changes = h.changes[1:MAX_CHANGE_HISTORY]
if len(h.changes) == MaxChangeHistory {
h.changes = h.changes[1:MaxChangeHistory]
}
h.changes = append(h.changes, c)
}

View File

@@ -21,7 +21,7 @@ func TestSuppressor(t *testing.T) {
// bw is 10000 / 10 = 1000
t1 = t0.Add(10 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 1000 {
t.Error("Incorrect bw %d", bw)
t.Errorf("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 10000, t1)
if sup {
@@ -34,7 +34,7 @@ func TestSuppressor(t *testing.T) {
// bw is (10000 + 10000) / 11 = 1818
t1 = t0.Add(11 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 1818 {
t.Error("Incorrect bw %d", bw)
t.Errorf("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 100500, t1)
if sup {
@@ -47,7 +47,7 @@ func TestSuppressor(t *testing.T) {
// bw is (10000 + 10000 + 100500) / 12 = 10041
t1 = t0.Add(12 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 10041 {
t.Error("Incorrect bw %d", bw)
t.Errorf("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 10000000, t1) // value will be ignored
if !sup {
@@ -60,7 +60,7 @@ func TestSuppressor(t *testing.T) {
// bw is (10000 + 10000 + 100500) / 15 = 8033
t1 = t0.Add(15 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 8033 {
t.Error("Incorrect bw %d", bw)
t.Errorf("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 10000000, t1)
if sup {
@@ -84,29 +84,29 @@ func TestHistory(t *testing.T) {
t.Errorf("Incorrect first record size %d", s)
}
for i := 1; i < MAX_CHANGE_HISTORY; i++ {
for i := 1; i < MaxChangeHistory; i++ {
h.append(int64(40+i), t0.Add(time.Duration(i)*time.Second))
}
if l := len(h.changes); l != MAX_CHANGE_HISTORY {
if l := len(h.changes); l != MaxChangeHistory {
t.Errorf("Incorrect history length %d", l)
}
if s := h.changes[0].size; s != 40 {
t.Errorf("Incorrect first record size %d", s)
}
if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 40+MAX_CHANGE_HISTORY-1 {
if s := h.changes[MaxChangeHistory-1].size; s != 40+MaxChangeHistory-1 {
t.Errorf("Incorrect last record size %d", s)
}
h.append(999, t0.Add(time.Duration(999)*time.Second))
if l := len(h.changes); l != MAX_CHANGE_HISTORY {
if l := len(h.changes); l != MaxChangeHistory {
t.Errorf("Incorrect history length %d", l)
}
if s := h.changes[0].size; s != 41 {
t.Errorf("Incorrect first record size %d", s)
}
if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 999 {
if s := h.changes[MaxChangeHistory-1].size; s != 999 {
t.Errorf("Incorrect last record size %d", s)
}

View File

View File

View File

View File

View File

View File

@@ -25,7 +25,7 @@ func loadCert(dir string) (tls.Certificate, error) {
return tls.LoadX509KeyPair(path.Join(dir, "cert.pem"), path.Join(dir, "key.pem"))
}
func certId(bs []byte) string {
func certID(bs []byte) string {
hf := sha256.New()
hf.Write(bs)
id := hf.Sum(nil)

View File

View File

View File

View File

View File

@@ -34,7 +34,7 @@ type Discoverer struct {
}
var (
ErrIncorrectMagic = errors.New("Incorrect magic number")
ErrIncorrectMagic = errors.New("incorrect magic number")
)
// We tolerate a certain amount of errors because we might be running on
@@ -80,24 +80,75 @@ func (d *Discoverer) sendAnnouncements() {
var errCounter = 0
var err error
remote := &net.UDPAddr{
IP: net.IP{255, 255, 255, 255},
Port: AnnouncementPort,
}
for errCounter < maxErrors {
for _, ipStr := range allBroadcasts() {
var addrStr = ipStr + ":21025"
intfs, err := net.Interfaces()
if err != nil {
log.Printf("discover/listInterfaces: %v; no local announcements", err)
return
}
remote, err := net.ResolveUDPAddr("udp4", addrStr)
if err != nil {
log.Printf("discover/external: %v; no external announcements", err)
return
}
for _, intf := range intfs {
if intf.Flags&(net.FlagBroadcast|net.FlagLoopback) == net.FlagBroadcast {
addrs, err := intf.Addrs()
if err != nil {
log.Println("discover/listAddrs: warning:", err)
errCounter++
continue
}
if Debug {
fmt.Println("send announcement -> ", remote)
}
_, _, err = d.conn.WriteMsgUDP(buf, nil, remote)
if err != nil {
log.Println("discover/write: warning:", err)
errCounter++
} else {
var srcAddr string
for _, addr := range addrs {
if strings.Contains(addr.String(), ".") {
// Found an IPv4 adress
parts := strings.Split(addr.String(), "/")
srcAddr = parts[0]
break
}
}
if len(srcAddr) == 0 {
if Debug {
log.Println("discover: debug: no source address found on interface", intf.Name)
}
continue
}
iaddr, err := net.ResolveUDPAddr("udp4", srcAddr+":0")
if err != nil {
log.Println("discover/resolve: warning:", err)
errCounter++
continue
}
conn, err := net.ListenUDP("udp4", iaddr)
if err != nil {
log.Println("discover/listen: warning:", err)
errCounter++
continue
}
if Debug {
log.Println("discover: debug: send announcement from", conn.LocalAddr(), "to", remote, "on", intf.Name)
}
_, err = conn.WriteTo(buf, remote)
if err != nil {
// Some interfaces don't seem to support broadcast even though the flags claims they do, i.e. vmnet
conn.Close()
if Debug {
log.Println("discover/write: debug:", err)
}
errCounter++
continue
}
conn.Close()
errCounter = 0
}
}
@@ -123,9 +174,9 @@ func (d *Discoverer) sendExtAnnouncements() {
for errCounter < maxErrors {
if Debug {
fmt.Println("send announcement -> ", remote)
log.Println("send announcement -> ", remote)
}
_, _, err = d.conn.WriteMsgUDP(buf, nil, remote)
_, err = d.conn.WriteTo(buf, remote)
if err != nil {
log.Println("discover/write: warning:", err)
errCounter++
@@ -134,7 +185,7 @@ func (d *Discoverer) sendExtAnnouncements() {
}
time.Sleep(d.ExtBroadcastIntv)
}
log.Println("discover/write: %v: stopping due to too many errors:", remote, err)
log.Printf("discover/write: %v: stopping due to too many errors: %v", remote, err)
}
func (d *Discoverer) recvAnnouncements() {
@@ -184,7 +235,6 @@ func (d *Discoverer) recvAnnouncements() {
d.registryLock.Lock()
_, seen := d.registry[pkt.NodeID]
if !seen {
fmt.Println("new node seen, forced announce")
select {
case d.forcedBroadcastTick <- time.Now():
}
@@ -290,38 +340,3 @@ func ipStr(ip []byte) string {
}
return strings.Join(ss, s)
}
func allBroadcasts() []string {
var bcasts = make(map[string]bool)
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil
}
for _, addr := range addrs {
switch {
case strings.HasPrefix(addr.String(), "127."):
// Ignore v4 localhost
case strings.Contains(addr.String(), ":"):
// Ignore all v6, because we need link local multicast there which I haven't implemented
default:
if in, ok := addr.(*net.IPNet); ok {
il := len(in.IP) - 1
ml := len(in.Mask) - 1
for i := range in.Mask {
in.IP[il-i] = in.IP[il-i] | ^in.Mask[ml-i]
}
parts := strings.Split(in.String(), "/")
bcasts[parts[0]] = true
}
}
}
var l []string
for ip := range bcasts {
l = append(l, ip)
}
return l
}

173
files/set.go Normal file
View File

@@ -0,0 +1,173 @@
package fileset
import "sync"
type File struct {
Key Key
Modified int64
Flags uint32
Data interface{}
}
type Key struct {
Name string
Version uint32
}
type fileRecord struct {
Usage int
File File
}
type bitset uint64
func (a Key) newerThan(b Key) bool {
return a.Version > b.Version
}
type Set struct {
mutex sync.RWMutex
files map[Key]fileRecord
remoteKey [64]map[string]Key
globalAvailability map[string]bitset
globalKey map[string]Key
}
func NewSet() *Set {
var m = Set{
files: make(map[Key]fileRecord),
globalAvailability: make(map[string]bitset),
globalKey: make(map[string]Key),
}
return &m
}
func (m *Set) AddLocal(fs []File) {
m.mutex.Lock()
m.unlockedAddRemote(0, fs)
m.mutex.Unlock()
}
func (m *Set) SetLocal(fs []File) {
m.mutex.Lock()
m.unlockedSetRemote(0, fs)
m.mutex.Unlock()
}
func (m *Set) AddRemote(cid uint, fs []File) {
if cid < 1 || cid > 63 {
panic("Connection ID must be in the range 1 - 63 inclusive")
}
m.mutex.Lock()
m.unlockedAddRemote(cid, fs)
m.mutex.Unlock()
}
func (m *Set) SetRemote(cid uint, fs []File) {
if cid < 1 || cid > 63 {
panic("Connection ID must be in the range 1 - 63 inclusive")
}
m.mutex.Lock()
m.unlockedSetRemote(cid, fs)
m.mutex.Unlock()
}
func (m *Set) unlockedAddRemote(cid uint, fs []File) {
remFiles := m.remoteKey[cid]
for _, f := range fs {
n := f.Key.Name
if ck, ok := remFiles[n]; ok && ck == f.Key {
// The remote already has exactly this file, skip it
continue
}
remFiles[n] = f.Key
// Keep the block list or increment the usage
if br, ok := m.files[f.Key]; !ok {
m.files[f.Key] = fileRecord{
Usage: 1,
File: f,
}
} else {
br.Usage++
m.files[f.Key] = br
}
// Update global view
gk, ok := m.globalKey[n]
switch {
case ok && f.Key == gk:
av := m.globalAvailability[n]
av |= 1 << cid
m.globalAvailability[n] = av
case f.Key.newerThan(gk):
m.globalKey[n] = f.Key
m.globalAvailability[n] = 1 << cid
}
}
}
func (m *Set) unlockedSetRemote(cid uint, fs []File) {
// Decrement usage for all files belonging to this remote, and remove
// those that are no longer needed.
for _, fk := range m.remoteKey[cid] {
br, ok := m.files[fk]
switch {
case ok && br.Usage == 1:
delete(m.files, fk)
case ok && br.Usage > 1:
br.Usage--
m.files[fk] = br
}
}
// Clear existing remote remoteKey
m.remoteKey[cid] = make(map[string]Key)
// Recalculate global based on all remaining remoteKey
for n := range m.globalKey {
var nk Key // newest key
var na bitset // newest availability
for i, rem := range m.remoteKey {
if rk, ok := rem[n]; ok {
switch {
case rk == nk:
na |= 1 << uint(i)
case rk.newerThan(nk):
nk = rk
na = 1 << uint(i)
}
}
}
if na != 0 {
// Someone had the file
m.globalKey[n] = nk
m.globalAvailability[n] = na
} else {
// Noone had the file
delete(m.globalKey, n)
delete(m.globalAvailability, n)
}
}
// Add new remote remoteKey to the mix
m.unlockedAddRemote(cid, fs)
}
func (m *Set) Need(cid uint) []File {
var fs []File
m.mutex.Lock()
for name, gk := range m.globalKey {
if gk.newerThan(m.remoteKey[cid][name]) {
fs = append(fs, m.files[gk].File)
}
}
m.mutex.Unlock()
return fs
}

207
files/set_test.go Normal file
View File

@@ -0,0 +1,207 @@
package fileset
import (
"fmt"
"reflect"
"testing"
)
func TestGlobalSet(t *testing.T) {
m := NewSet()
local := []File{
File{Key{"a", 1000}, 0, 0, nil},
File{Key{"b", 1000}, 0, 0, nil},
File{Key{"c", 1000}, 0, 0, nil},
File{Key{"d", 1000}, 0, 0, nil},
}
remote := []File{
File{Key{"a", 1000}, 0, 0, nil},
File{Key{"b", 1001}, 0, 0, nil},
File{Key{"c", 1002}, 0, 0, nil},
File{Key{"e", 1000}, 0, 0, nil},
}
expectedGlobal := map[string]Key{
"a": local[0].Key,
"b": remote[1].Key,
"c": remote[2].Key,
"d": local[3].Key,
"e": remote[3].Key,
}
m.SetLocal(local)
m.SetRemote(1, remote)
if !reflect.DeepEqual(m.globalKey, expectedGlobal) {
t.Errorf("Global incorrect;\n%v !=\n%v", m.globalKey, expectedGlobal)
}
if lb := len(m.files); lb != 7 {
t.Errorf("Num files incorrect %d != 7\n%v", lb, m.files)
}
}
func BenchmarkSetLocal10k(b *testing.B) {
m := NewSet()
var local []File
for i := 0; i < 10000; i++ {
local = append(local, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
}
var remote []File
for i := 0; i < 10000; i++ {
remote = append(remote, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
}
m.SetRemote(1, remote)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.SetLocal(local)
}
}
func BenchmarkSetLocal10(b *testing.B) {
m := NewSet()
var local []File
for i := 0; i < 10; i++ {
local = append(local, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
}
var remote []File
for i := 0; i < 10000; i++ {
remote = append(remote, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
}
m.SetRemote(1, remote)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.SetLocal(local)
}
}
func BenchmarkAddLocal10k(b *testing.B) {
m := NewSet()
var local []File
for i := 0; i < 10000; i++ {
local = append(local, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
}
var remote []File
for i := 0; i < 10000; i++ {
remote = append(remote, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
}
m.SetRemote(1, remote)
m.SetLocal(local)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
for j := range local {
local[j].Key.Version++
}
b.StartTimer()
m.AddLocal(local)
}
}
func BenchmarkAddLocal10(b *testing.B) {
m := NewSet()
var local []File
for i := 0; i < 10; i++ {
local = append(local, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
}
var remote []File
for i := 0; i < 10000; i++ {
remote = append(remote, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
}
m.SetRemote(1, remote)
m.SetLocal(local)
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := range local {
local[j].Key.Version++
}
m.AddLocal(local)
}
}
func TestGlobalReset(t *testing.T) {
m := NewSet()
local := []File{
File{Key{"a", 1000}, 0, 0, nil},
File{Key{"b", 1000}, 0, 0, nil},
File{Key{"c", 1000}, 0, 0, nil},
File{Key{"d", 1000}, 0, 0, nil},
}
remote := []File{
File{Key{"a", 1000}, 0, 0, nil},
File{Key{"b", 1001}, 0, 0, nil},
File{Key{"c", 1002}, 0, 0, nil},
File{Key{"e", 1000}, 0, 0, nil},
}
expectedGlobalKey := map[string]Key{
"a": local[0].Key,
"b": local[1].Key,
"c": local[2].Key,
"d": local[3].Key,
}
m.SetLocal(local)
m.SetRemote(1, remote)
m.SetRemote(1, nil)
if !reflect.DeepEqual(m.globalKey, expectedGlobalKey) {
t.Errorf("Global incorrect;\n%v !=\n%v", m.globalKey, expectedGlobalKey)
}
if lb := len(m.files); lb != 4 {
t.Errorf("Num files incorrect %d != 4\n%v", lb, m.files)
}
}
func TestNeed(t *testing.T) {
m := NewSet()
local := []File{
File{Key{"a", 1000}, 0, 0, nil},
File{Key{"b", 1000}, 0, 0, nil},
File{Key{"c", 1000}, 0, 0, nil},
File{Key{"d", 1000}, 0, 0, nil},
}
remote := []File{
File{Key{"a", 1000}, 0, 0, nil},
File{Key{"b", 1001}, 0, 0, nil},
File{Key{"c", 1002}, 0, 0, nil},
File{Key{"e", 1000}, 0, 0, nil},
}
shouldNeed := []File{
File{Key{"b", 1001}, 0, 0, nil},
File{Key{"c", 1002}, 0, 0, nil},
File{Key{"e", 1000}, 0, 0, nil},
}
m.SetLocal(local)
m.SetRemote(1, remote)
need := m.Need(0)
if !reflect.DeepEqual(need, shouldNeed) {
t.Errorf("Need incorrect;\n%v !=\n%v", need, shouldNeed)
}
}

View File

@@ -46,7 +46,6 @@ func (e *ErrPipe) Write(data []byte) (int, error) {
e.PipeWriter.CloseWithError(e.err)
e.closed = true
return n, e.err
} else {
return e.PipeWriter.Write(data)
}
return e.PipeWriter.Write(data)
}

View File

@@ -31,7 +31,8 @@ const (
)
var (
ErrClusterHash = fmt.Errorf("Configuration error: mismatched cluster hash")
ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
ErrClosed = errors.New("connection closed")
)
type Model interface {
@@ -56,7 +57,7 @@ type Connection struct {
xw *xdr.Writer
closed bool
awaiting map[int]chan asyncResult
nextId int
nextID int
indexSent map[string]map[string][2]int64
peerOptions map[string]string
myOptions map[string]string
@@ -68,8 +69,6 @@ type Connection struct {
statisticsLock sync.Mutex
}
var ErrClosed = errors.New("Connection closed")
type asyncResult struct {
val []byte
err error
@@ -105,7 +104,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
c.myOptions = options
go func() {
c.Lock()
header{0, c.nextId, messageTypeOptions}.encodeXDR(c.xw)
header{0, c.nextID, messageTypeOptions}.encodeXDR(c.xw)
var om OptionsMessage
for k, v := range options {
om.Options = append(om.Options, Option{k, v})
@@ -118,7 +117,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
if err != nil {
log.Println("Warning: Write error during initial handshake:", err)
}
c.nextId++
c.nextID++
c.Unlock()
}()
}
@@ -155,12 +154,12 @@ func (c *Connection) Index(repo string, idx []FileInfo) {
idx = diff
}
header{0, c.nextId, msgType}.encodeXDR(c.xw)
header{0, c.nextID, msgType}.encodeXDR(c.xw)
_, err := IndexMessage{repo, idx}.encodeXDR(c.xw)
if err == nil {
err = c.flush()
}
c.nextId = (c.nextId + 1) & 0xfff
c.nextID = (c.nextID + 1) & 0xfff
c.hasSentIndex = true
c.Unlock()
@@ -178,8 +177,8 @@ func (c *Connection) Request(repo string, name string, offset int64, size int) (
return nil, ErrClosed
}
rc := make(chan asyncResult)
c.awaiting[c.nextId] = rc
header{0, c.nextId, messageTypeRequest}.encodeXDR(c.xw)
c.awaiting[c.nextID] = rc
header{0, c.nextID, messageTypeRequest}.encodeXDR(c.xw)
_, err := RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw)
if err == nil {
err = c.flush()
@@ -189,7 +188,7 @@ func (c *Connection) Request(repo string, name string, offset int64, size int) (
c.close(err)
return nil, err
}
c.nextId = (c.nextId + 1) & 0xfff
c.nextID = (c.nextID + 1) & 0xfff
c.Unlock()
res, ok := <-rc
@@ -206,8 +205,8 @@ func (c *Connection) ping() bool {
return false
}
rc := make(chan asyncResult, 1)
c.awaiting[c.nextId] = rc
header{0, c.nextId, messageTypePing}.encodeXDR(c.xw)
c.awaiting[c.nextID] = rc
header{0, c.nextID, messageTypePing}.encodeXDR(c.xw)
err := c.flush()
if err != nil {
c.Unlock()
@@ -218,7 +217,7 @@ func (c *Connection) ping() bool {
c.close(c.xw.Error())
return false
}
c.nextId = (c.nextId + 1) & 0xfff
c.nextID = (c.nextID + 1) & 0xfff
c.Unlock()
res, ok := <-rc
@@ -268,7 +267,7 @@ loop:
break loop
}
if hdr.version != 0 {
c.close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version))
c.close(fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version))
break loop
}
@@ -371,7 +370,7 @@ loop:
}
default:
c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
c.close(fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType))
break loop
}
}
@@ -410,10 +409,10 @@ func (c *Connection) pingerLoop() {
select {
case ok := <-rc:
if !ok {
c.close(fmt.Errorf("Ping failure"))
c.close(fmt.Errorf("ping failure"))
}
case <-time.After(pingTimeout):
c.close(fmt.Errorf("Ping timeout"))
c.close(fmt.Errorf("ping timeout"))
}
}
}

View File

@@ -38,7 +38,7 @@ func TestPing(t *testing.T) {
}
func TestPingErr(t *testing.T) {
e := errors.New("Something broke")
e := errors.New("something broke")
for i := 0; i < 12; i++ {
for j := 0; j < 12; j++ {
@@ -64,7 +64,7 @@ func TestPingErr(t *testing.T) {
}
func TestRequestResponseErr(t *testing.T) {
e := errors.New("Something broke")
e := errors.New("something broke")
var pass bool
for i := 0; i < 48; i++ {
@@ -99,16 +99,16 @@ func TestRequestResponseErr(t *testing.T) {
t.Errorf("Incorrect response data %q", string(d))
}
if m0.repo != "default" {
t.Error("Incorrect repo %q", m0.repo)
t.Errorf("Incorrect repo %q", m0.repo)
}
if m0.name != "tn" {
t.Error("Incorrect name %q", m0.name)
t.Errorf("Incorrect name %q", m0.name)
}
if m0.offset != 1234 {
t.Error("Incorrect offset %d", m0.offset)
t.Errorf("Incorrect offset %d", m0.offset)
}
if m0.size != 5678 {
t.Error("Incorrect size %d", m0.size)
t.Errorf("Incorrect size %d", m0.size)
}
t.Logf("Pass at %d+%d bytes", i, j)
pass = true

View File

@@ -5,7 +5,7 @@ import (
"io"
)
var ErrElementSizeExceeded = errors.New("Element size exceeded")
var ErrElementSizeExceeded = errors.New("element size exceeded")
type Reader struct {
r io.Reader