mirror of
https://github.com/syncthing/syncthing.git
synced 2026-01-02 19:09:11 -05:00
Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d0600de38 | ||
|
|
6a1c055288 | ||
|
|
b9ec30ebdb | ||
|
|
428164f395 | ||
|
|
ba59e0d3f0 | ||
|
|
5d8f0f835e | ||
|
|
b4a1aadd1b | ||
|
|
8f41d90ab1 | ||
|
|
9743386166 | ||
|
|
0afcb5b7e7 | ||
|
|
043dea760f | ||
|
|
0618e2b9b4 | ||
|
|
3c171d281c | ||
|
|
c217b7cd22 | ||
|
|
23593c3d20 | ||
|
|
192117dc11 | ||
|
|
24b8f9211a | ||
|
|
51788d6f0e | ||
|
|
ea0bed2238 | ||
|
|
e2fe57c440 | ||
|
|
434a0ccf2a | ||
|
|
e7bf3ac108 | ||
|
|
c5bdaebf2b |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
syncthing
|
||||
syncthing.exe
|
||||
*.tar.gz
|
||||
dist
|
||||
*.zip
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
syncthing
|
||||
syncthing [](https://drone.io/github.com/calmh/syncthing/latest)
|
||||
=========
|
||||
|
||||
This is the `syncthing` project. The following are the project goals:
|
||||
|
||||
@@ -18,7 +18,7 @@ cd gui
|
||||
for f in $(find . -type f) ; do
|
||||
f="${f#./}"
|
||||
echo "gr, _ = gzip.NewReader(bytes.NewBuffer([]byte{"
|
||||
gzip -c $f | od -vt x1 | sed 's/^[0-9a-f]*//' | sed 's/\([0-9a-f][0-9a-f]\)/0x\1,/g'
|
||||
gzip -n -c $f | od -vt x1 | sed 's/^[0-9a-f]*//' | sed 's/\([0-9a-f][0-9a-f]\)/0x\1,/g'
|
||||
echo "}))"
|
||||
echo "data, _ = ioutil.ReadAll(gr)"
|
||||
echo "Assets[\"$f\"] = data"
|
||||
|
||||
23620
auto/gui.files.go
23620
auto/gui.files.go
File diff suppressed because it is too large
Load Diff
118
build.sh
118
build.sh
@@ -2,48 +2,90 @@
|
||||
|
||||
export COPYFILE_DISABLE=true
|
||||
|
||||
distFiles=(README.md LICENSE) # apart from the binary itself
|
||||
version=$(git describe --always)
|
||||
buildDir=dist
|
||||
|
||||
if [[ $fast != yes ]] ; then
|
||||
build() {
|
||||
go build -ldflags "-w -X main.Version $version" ./cmd/syncthing
|
||||
}
|
||||
|
||||
prepare() {
|
||||
./assets.sh | gofmt > auto/gui.files.go
|
||||
go get -d
|
||||
}
|
||||
|
||||
test() {
|
||||
go test ./...
|
||||
fi
|
||||
}
|
||||
|
||||
if [[ -z $1 ]] ; then
|
||||
go build -ldflags "-X main.Version $version"
|
||||
elif [[ $1 == "tar" ]] ; then
|
||||
go build -ldflags "-X main.Version $version" \
|
||||
&& mkdir syncthing-dist \
|
||||
&& cp syncthing README.md LICENSE syncthing-dist \
|
||||
&& tar zcvf syncthing-dist.tar.gz syncthing-dist \
|
||||
&& rm -rf syncthing-dist
|
||||
elif [[ $1 == "all" ]] ; then
|
||||
rm -rf "$buildDir"
|
||||
mkdir -p "$buildDir" || exit 1
|
||||
tarDist() {
|
||||
name="$1"
|
||||
mkdir -p "$name"
|
||||
cp syncthing "${distFiles[@]}" "$name"
|
||||
tar zcvf "$name.tar.gz" "$name"
|
||||
rm -rf "$name"
|
||||
}
|
||||
|
||||
export GOARM=7
|
||||
for os in darwin-amd64 linux-amd64 linux-arm freebsd-amd64 windows-amd64 ; do
|
||||
echo "$os"
|
||||
export name="syncthing-$os"
|
||||
export GOOS=${os%-*}
|
||||
export GOARCH=${os#*-}
|
||||
go build -ldflags "-X main.Version $version"
|
||||
mkdir -p "$name"
|
||||
cp README.md LICENSE "$name"
|
||||
case $GOOS in
|
||||
windows)
|
||||
cp syncthing.exe "$buildDir/$name.exe"
|
||||
mv syncthing.exe "$name"
|
||||
zip -qr "$buildDir/$name.zip" "$name"
|
||||
;;
|
||||
*)
|
||||
cp syncthing "$buildDir/$name"
|
||||
mv syncthing "$name"
|
||||
tar zcf "$buildDir/$name.tar.gz" "$name"
|
||||
;;
|
||||
esac
|
||||
rm -r "$name"
|
||||
done
|
||||
fi
|
||||
zipDist() {
|
||||
name="$1"
|
||||
mkdir -p "$name"
|
||||
cp syncthing.exe "${distFiles[@]}" "$name"
|
||||
zip -r "$name.zip" "$name"
|
||||
rm -rf "$name"
|
||||
}
|
||||
|
||||
case "$1" in
|
||||
"")
|
||||
build
|
||||
;;
|
||||
|
||||
tar)
|
||||
rm -f *.tar.gz *.zip
|
||||
prepare
|
||||
test || exit 1
|
||||
build
|
||||
|
||||
eval $(go env)
|
||||
name="syncthing-$GOOS-$GOARCH-$version"
|
||||
|
||||
tarDist "$name"
|
||||
;;
|
||||
|
||||
all)
|
||||
rm -f *.tar.gz *.zip
|
||||
prepare
|
||||
test || exit 1
|
||||
|
||||
export GOARM=7
|
||||
for os in darwin-amd64 linux-amd64 linux-arm freebsd-amd64 windows-amd64 ; do
|
||||
export GOOS=${os%-*}
|
||||
export GOARCH=${os#*-}
|
||||
|
||||
build
|
||||
|
||||
name="syncthing-$os-$version"
|
||||
case $GOOS in
|
||||
windows)
|
||||
zipDist "$name"
|
||||
rm -f syncthing.exe
|
||||
;;
|
||||
*)
|
||||
tarDist "$name"
|
||||
rm -f syncthing
|
||||
;;
|
||||
esac
|
||||
done
|
||||
;;
|
||||
|
||||
upload)
|
||||
tag=$(git describe)
|
||||
shopt -s nullglob
|
||||
for f in *gz *zip ; do
|
||||
relup calmh/syncthing "$tag" "$f"
|
||||
done
|
||||
;;
|
||||
|
||||
*)
|
||||
echo "Unknown build parameter $1"
|
||||
;;
|
||||
esac
|
||||
|
||||
42
cid/cid.go
Normal file
42
cid/cid.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package cid
|
||||
|
||||
type Map struct {
|
||||
toCid map[string]int
|
||||
toName []string
|
||||
}
|
||||
|
||||
func NewMap() *Map {
|
||||
return &Map{
|
||||
toCid: make(map[string]int),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Map) Get(name string) int {
|
||||
cid, ok := m.toCid[name]
|
||||
if ok {
|
||||
return cid
|
||||
}
|
||||
|
||||
// Find a free slot to get a new ID
|
||||
for i, n := range m.toName {
|
||||
if n == "" {
|
||||
m.toName[i] = name
|
||||
m.toCid[name] = i
|
||||
return i
|
||||
}
|
||||
}
|
||||
|
||||
// Add it to the end since we didn't find a free slot
|
||||
m.toName = append(m.toName, name)
|
||||
cid = len(m.toName) - 1
|
||||
m.toCid[name] = cid
|
||||
return cid
|
||||
}
|
||||
|
||||
func (m *Map) Clear(name string) {
|
||||
cid, ok := m.toCid[name]
|
||||
if ok {
|
||||
m.toName[cid] = ""
|
||||
delete(m.toCid, name)
|
||||
}
|
||||
}
|
||||
1
cmd/.gitignore
vendored
Normal file
1
cmd/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
!syncthing
|
||||
@@ -17,7 +17,7 @@ func Blocks(r io.Reader, blocksize int) ([]Block, error) {
|
||||
var blocks []Block
|
||||
var offset int64
|
||||
for {
|
||||
lr := &io.LimitedReader{r, int64(blocksize)}
|
||||
lr := &io.LimitedReader{R: r, N: int64(blocksize)}
|
||||
hf := sha256.New()
|
||||
n, err := io.Copy(hf, lr)
|
||||
if err != nil {
|
||||
@@ -36,7 +36,7 @@ type OptionsConfiguration struct {
|
||||
FollowSymlinks bool `xml:"followSymlinks" default:"true" ini:"follow-symlinks"`
|
||||
GUIEnabled bool `xml:"guiEnabled" default:"true" ini:"gui-enabled"`
|
||||
GUIAddress string `xml:"guiAddress" default:"127.0.0.1:8080" ini:"gui-address"`
|
||||
GlobalAnnServer string `xml:"globalAnnounceServer" default:"syncthing.nym.se:22025" ini:"global-announce-server"`
|
||||
GlobalAnnServer string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22025" ini:"global-announce-server"`
|
||||
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true" ini:"global-announce-enabled"`
|
||||
LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true" ini:"local-announce-enabled"`
|
||||
ParallelRequests int `xml:"parallelRequests" default:"16" ini:"parallel-requests"`
|
||||
@@ -46,7 +46,7 @@ type OptionsConfiguration struct {
|
||||
MaxChangeKbps int `xml:"maxChangeKbps" default:"1000" ini:"max-change-bw"`
|
||||
}
|
||||
|
||||
func setDefaults(data interface{}) error {
|
||||
func setDefaults(data interface{}, setEmptySlices bool) error {
|
||||
s := reflect.ValueOf(data).Elem()
|
||||
t := s.Type()
|
||||
|
||||
@@ -56,14 +56,20 @@ func setDefaults(data interface{}) error {
|
||||
|
||||
v := tag.Get("default")
|
||||
if len(v) > 0 {
|
||||
if f.Kind().String() == "slice" && f.Len() != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
switch f.Interface().(type) {
|
||||
case string:
|
||||
f.SetString(v)
|
||||
|
||||
case []string:
|
||||
rv := reflect.MakeSlice(reflect.TypeOf([]string{}), 1, 1)
|
||||
rv.Index(0).SetString(v)
|
||||
f.Set(rv)
|
||||
if setEmptySlices {
|
||||
rv := reflect.MakeSlice(reflect.TypeOf([]string{}), 1, 1)
|
||||
rv.Index(0).SetString(v)
|
||||
f.Set(rv)
|
||||
}
|
||||
|
||||
case int:
|
||||
i, err := strconv.ParseInt(v, 10, 64)
|
||||
@@ -146,14 +152,16 @@ func uniqueStrings(ss []string) []string {
|
||||
func readConfigXML(rd io.Reader) (Configuration, error) {
|
||||
var cfg Configuration
|
||||
|
||||
setDefaults(&cfg)
|
||||
setDefaults(&cfg.Options)
|
||||
setDefaults(&cfg, false)
|
||||
setDefaults(&cfg.Options, false)
|
||||
|
||||
var err error
|
||||
if rd != nil {
|
||||
err = xml.NewDecoder(rd).Decode(&cfg)
|
||||
}
|
||||
|
||||
setDefaults(&cfg.Options, true)
|
||||
|
||||
cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress)
|
||||
return cfg, err
|
||||
}
|
||||
@@ -272,7 +272,7 @@ func TestFileQueueThreadHandling(t *testing.T) {
|
||||
close(start)
|
||||
wg.Wait()
|
||||
if int(gotTot) != total {
|
||||
t.Error("Total mismatch; %d != %d", gotTot, total)
|
||||
t.Errorf("Total mismatch; %d != %d", gotTot, total)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -283,13 +283,13 @@ func TestDeleteAt(t *testing.T) {
|
||||
q.files = queuedFileList{{name: "a"}, {name: "b"}, {name: "c"}, {name: "d"}}
|
||||
q.deleteAt(i)
|
||||
if l := len(q.files); l != 3 {
|
||||
t.Fatal("deleteAt(%d) failed; %d != 3", i, l)
|
||||
t.Fatalf("deleteAt(%d) failed; %d != 3", i, l)
|
||||
}
|
||||
}
|
||||
|
||||
q.files = queuedFileList{{name: "a"}}
|
||||
q.deleteAt(0)
|
||||
if l := len(q.files); l != 0 {
|
||||
t.Fatal("deleteAt(only) failed; %d != 0", l)
|
||||
t.Fatalf("deleteAt(only) failed; %d != 0", l)
|
||||
}
|
||||
}
|
||||
@@ -24,11 +24,10 @@ import (
|
||||
)
|
||||
|
||||
var cfg Configuration
|
||||
var Version string = "unknown-dev"
|
||||
var Version = "unknown-dev"
|
||||
|
||||
var (
|
||||
myID string
|
||||
config ini.Config
|
||||
myID string
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -41,7 +40,7 @@ var (
|
||||
)
|
||||
|
||||
func main() {
|
||||
flag.StringVar(&confDir, "home", "~/.syncthing", "Set configuration directory")
|
||||
flag.StringVar(&confDir, "home", getDefaultConfDir(), "Set configuration directory")
|
||||
flag.StringVar(&trace, "debug.trace", "", "(connect,net,idx,file,pull)")
|
||||
flag.StringVar(&profiler, "debug.profiler", "", "(addr)")
|
||||
flag.BoolVar(&showVersion, "version", false, "Show version")
|
||||
@@ -83,7 +82,7 @@ func main() {
|
||||
fatalErr(err)
|
||||
}
|
||||
|
||||
myID = string(certId(cert.Certificate[0]))
|
||||
myID = string(certID(cert.Certificate[0]))
|
||||
log.SetPrefix("[" + myID[0:5] + "] ")
|
||||
logger.SetPrefix("[" + myID[0:5] + "] ")
|
||||
|
||||
@@ -139,7 +138,7 @@ func main() {
|
||||
cfg, err = readConfigXML(nil)
|
||||
cfg.Repositories = []RepositoryConfiguration{
|
||||
{
|
||||
Directory: "~/Sync",
|
||||
Directory: path.Join(getHomeDir(), "Sync"),
|
||||
Nodes: []NodeConfiguration{
|
||||
{NodeID: myID, Addresses: []string{"dynamic"}},
|
||||
},
|
||||
@@ -188,16 +187,26 @@ func main() {
|
||||
|
||||
// GUI
|
||||
if cfg.Options.GUIEnabled && cfg.Options.GUIAddress != "" {
|
||||
host, port, err := net.SplitHostPort(cfg.Options.GUIAddress)
|
||||
addr, err := net.ResolveTCPAddr("tcp", cfg.Options.GUIAddress)
|
||||
if err != nil {
|
||||
warnf("Cannot start GUI on %q: %v", cfg.Options.GUIAddress, err)
|
||||
} else {
|
||||
if len(host) > 0 {
|
||||
infof("Starting web GUI on http://%s", cfg.Options.GUIAddress)
|
||||
} else {
|
||||
infof("Starting web GUI on port %s", port)
|
||||
var hostOpen, hostShow string
|
||||
switch {
|
||||
case addr.IP == nil:
|
||||
hostOpen = "localhost"
|
||||
hostShow = "0.0.0.0"
|
||||
case addr.IP.IsUnspecified():
|
||||
hostOpen = "localhost"
|
||||
hostShow = addr.IP.String()
|
||||
default:
|
||||
hostOpen = addr.IP.String()
|
||||
hostShow = hostOpen
|
||||
}
|
||||
|
||||
infof("Starting web GUI on http://%s:%d/", hostShow, addr.Port)
|
||||
startGUI(cfg.Options.GUIAddress, m)
|
||||
openURL(fmt.Sprintf("http://%s:%d", hostOpen, addr.Port))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -319,6 +328,13 @@ func saveConfigLoop(cfgFile string) {
|
||||
continue
|
||||
}
|
||||
|
||||
if runtime.GOOS == "windows" {
|
||||
err := os.Remove(cfgFile)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
warnln(err)
|
||||
}
|
||||
}
|
||||
|
||||
err = os.Rename(cfgFile+".tmp", cfgFile)
|
||||
if err != nil {
|
||||
warnln(err)
|
||||
@@ -388,7 +404,7 @@ listen:
|
||||
continue
|
||||
}
|
||||
|
||||
remoteID := certId(tc.ConnectionState().PeerCertificates[0].Raw)
|
||||
remoteID := certID(tc.ConnectionState().PeerCertificates[0].Raw)
|
||||
|
||||
if remoteID == myID {
|
||||
warnf("Connect from myself (%s) - should not happen", remoteID)
|
||||
@@ -469,7 +485,7 @@ func connect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Confi
|
||||
continue
|
||||
}
|
||||
|
||||
remoteID := certId(conn.ConnectionState().PeerCertificates[0].Raw)
|
||||
remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw)
|
||||
if remoteID != nodeCfg.NodeID {
|
||||
warnln("Unexpected nodeID", remoteID, "!=", nodeCfg.NodeID)
|
||||
conn.Close()
|
||||
@@ -502,7 +518,10 @@ func saveIndex(m *Model) {
|
||||
|
||||
gzw := gzip.NewWriter(idxf)
|
||||
|
||||
protocol.IndexMessage{"local", m.ProtocolIndex()}.EncodeXDR(gzw)
|
||||
protocol.IndexMessage{
|
||||
Repository: "local",
|
||||
Files: m.ProtocolIndex(),
|
||||
}.EncodeXDR(gzw)
|
||||
gzw.Close()
|
||||
idxf.Close()
|
||||
os.Rename(fullName+".tmp", fullName)
|
||||
@@ -542,16 +561,38 @@ func ensureDir(dir string, mode int) {
|
||||
}
|
||||
|
||||
func expandTilde(p string) string {
|
||||
if runtime.GOOS == "windows" {
|
||||
return p
|
||||
}
|
||||
|
||||
if strings.HasPrefix(p, "~/") {
|
||||
return strings.Replace(p, "~", getHomeDir(), 1)
|
||||
return strings.Replace(p, "~", getUnixHomeDir(), 1)
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func getHomeDir() string {
|
||||
func getUnixHomeDir() string {
|
||||
home := os.Getenv("HOME")
|
||||
if home == "" {
|
||||
fatalln("No home directory?")
|
||||
}
|
||||
return home
|
||||
}
|
||||
|
||||
func getHomeDir() string {
|
||||
if runtime.GOOS == "windows" {
|
||||
home := os.Getenv("HOMEDRIVE") + os.Getenv("HOMEPATH")
|
||||
if home == "" {
|
||||
home = os.Getenv("USERPROFILE")
|
||||
}
|
||||
return home
|
||||
}
|
||||
return getUnixHomeDir()
|
||||
}
|
||||
|
||||
func getDefaultConfDir() string {
|
||||
if runtime.GOOS == "windows" {
|
||||
return path.Join(os.Getenv("AppData"), "syncthing")
|
||||
}
|
||||
return expandTilde("~/.syncthing")
|
||||
}
|
||||
@@ -64,9 +64,6 @@ type Connection interface {
|
||||
const (
|
||||
idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
|
||||
idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
|
||||
|
||||
minFileHoldTimeS = 60 // Never allow file changes more often than this
|
||||
maxFileHoldTimeS = 600 // Always allow file changes at least this often
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -194,7 +191,10 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
|
||||
}
|
||||
}
|
||||
|
||||
ci.Completion = int(100 * have / tot)
|
||||
ci.Completion = 100
|
||||
if tot != 0 {
|
||||
ci.Completion = int(100 * have / tot)
|
||||
}
|
||||
|
||||
res[node] = ci
|
||||
}
|
||||
34
cmd/syncthing/openurl.go
Normal file
34
cmd/syncthing/openurl.go
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
Copyright 2011 Google Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
func openURL(url string) error {
|
||||
if runtime.GOOS == "windows" {
|
||||
return exec.Command("cmd.exe", "/C", "start "+url).Run()
|
||||
}
|
||||
|
||||
if runtime.GOOS == "darwin" {
|
||||
return exec.Command("open", url).Run()
|
||||
}
|
||||
|
||||
return exec.Command("xdg-open", url).Run()
|
||||
}
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
MAX_CHANGE_HISTORY = 4
|
||||
MaxChangeHistory = 4
|
||||
)
|
||||
|
||||
type change struct {
|
||||
@@ -45,8 +45,8 @@ func (h changeHistory) bandwidth(t time.Time) int64 {
|
||||
|
||||
func (h *changeHistory) append(size int64, t time.Time) {
|
||||
c := change{size, t}
|
||||
if len(h.changes) == MAX_CHANGE_HISTORY {
|
||||
h.changes = h.changes[1:MAX_CHANGE_HISTORY]
|
||||
if len(h.changes) == MaxChangeHistory {
|
||||
h.changes = h.changes[1:MaxChangeHistory]
|
||||
}
|
||||
h.changes = append(h.changes, c)
|
||||
}
|
||||
@@ -21,7 +21,7 @@ func TestSuppressor(t *testing.T) {
|
||||
// bw is 10000 / 10 = 1000
|
||||
t1 = t0.Add(10 * time.Second)
|
||||
if bw := s.changes["foo"].bandwidth(t1); bw != 1000 {
|
||||
t.Error("Incorrect bw %d", bw)
|
||||
t.Errorf("Incorrect bw %d", bw)
|
||||
}
|
||||
sup, prev = s.suppress("foo", 10000, t1)
|
||||
if sup {
|
||||
@@ -34,7 +34,7 @@ func TestSuppressor(t *testing.T) {
|
||||
// bw is (10000 + 10000) / 11 = 1818
|
||||
t1 = t0.Add(11 * time.Second)
|
||||
if bw := s.changes["foo"].bandwidth(t1); bw != 1818 {
|
||||
t.Error("Incorrect bw %d", bw)
|
||||
t.Errorf("Incorrect bw %d", bw)
|
||||
}
|
||||
sup, prev = s.suppress("foo", 100500, t1)
|
||||
if sup {
|
||||
@@ -47,7 +47,7 @@ func TestSuppressor(t *testing.T) {
|
||||
// bw is (10000 + 10000 + 100500) / 12 = 10041
|
||||
t1 = t0.Add(12 * time.Second)
|
||||
if bw := s.changes["foo"].bandwidth(t1); bw != 10041 {
|
||||
t.Error("Incorrect bw %d", bw)
|
||||
t.Errorf("Incorrect bw %d", bw)
|
||||
}
|
||||
sup, prev = s.suppress("foo", 10000000, t1) // value will be ignored
|
||||
if !sup {
|
||||
@@ -60,7 +60,7 @@ func TestSuppressor(t *testing.T) {
|
||||
// bw is (10000 + 10000 + 100500) / 15 = 8033
|
||||
t1 = t0.Add(15 * time.Second)
|
||||
if bw := s.changes["foo"].bandwidth(t1); bw != 8033 {
|
||||
t.Error("Incorrect bw %d", bw)
|
||||
t.Errorf("Incorrect bw %d", bw)
|
||||
}
|
||||
sup, prev = s.suppress("foo", 10000000, t1)
|
||||
if sup {
|
||||
@@ -84,29 +84,29 @@ func TestHistory(t *testing.T) {
|
||||
t.Errorf("Incorrect first record size %d", s)
|
||||
}
|
||||
|
||||
for i := 1; i < MAX_CHANGE_HISTORY; i++ {
|
||||
for i := 1; i < MaxChangeHistory; i++ {
|
||||
h.append(int64(40+i), t0.Add(time.Duration(i)*time.Second))
|
||||
}
|
||||
|
||||
if l := len(h.changes); l != MAX_CHANGE_HISTORY {
|
||||
if l := len(h.changes); l != MaxChangeHistory {
|
||||
t.Errorf("Incorrect history length %d", l)
|
||||
}
|
||||
if s := h.changes[0].size; s != 40 {
|
||||
t.Errorf("Incorrect first record size %d", s)
|
||||
}
|
||||
if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 40+MAX_CHANGE_HISTORY-1 {
|
||||
if s := h.changes[MaxChangeHistory-1].size; s != 40+MaxChangeHistory-1 {
|
||||
t.Errorf("Incorrect last record size %d", s)
|
||||
}
|
||||
|
||||
h.append(999, t0.Add(time.Duration(999)*time.Second))
|
||||
|
||||
if l := len(h.changes); l != MAX_CHANGE_HISTORY {
|
||||
if l := len(h.changes); l != MaxChangeHistory {
|
||||
t.Errorf("Incorrect history length %d", l)
|
||||
}
|
||||
if s := h.changes[0].size; s != 41 {
|
||||
t.Errorf("Incorrect first record size %d", s)
|
||||
}
|
||||
if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 999 {
|
||||
if s := h.changes[MaxChangeHistory-1].size; s != 999 {
|
||||
t.Errorf("Incorrect last record size %d", s)
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ func loadCert(dir string) (tls.Certificate, error) {
|
||||
return tls.LoadX509KeyPair(path.Join(dir, "cert.pem"), path.Join(dir, "key.pem"))
|
||||
}
|
||||
|
||||
func certId(bs []byte) string {
|
||||
func certID(bs []byte) string {
|
||||
hf := sha256.New()
|
||||
hf.Write(bs)
|
||||
id := hf.Sum(nil)
|
||||
@@ -34,7 +34,7 @@ type Discoverer struct {
|
||||
}
|
||||
|
||||
var (
|
||||
ErrIncorrectMagic = errors.New("Incorrect magic number")
|
||||
ErrIncorrectMagic = errors.New("incorrect magic number")
|
||||
)
|
||||
|
||||
// We tolerate a certain amount of errors because we might be running on
|
||||
@@ -80,24 +80,75 @@ func (d *Discoverer) sendAnnouncements() {
|
||||
var errCounter = 0
|
||||
var err error
|
||||
|
||||
remote := &net.UDPAddr{
|
||||
IP: net.IP{255, 255, 255, 255},
|
||||
Port: AnnouncementPort,
|
||||
}
|
||||
|
||||
for errCounter < maxErrors {
|
||||
for _, ipStr := range allBroadcasts() {
|
||||
var addrStr = ipStr + ":21025"
|
||||
intfs, err := net.Interfaces()
|
||||
if err != nil {
|
||||
log.Printf("discover/listInterfaces: %v; no local announcements", err)
|
||||
return
|
||||
}
|
||||
|
||||
remote, err := net.ResolveUDPAddr("udp4", addrStr)
|
||||
if err != nil {
|
||||
log.Printf("discover/external: %v; no external announcements", err)
|
||||
return
|
||||
}
|
||||
for _, intf := range intfs {
|
||||
if intf.Flags&(net.FlagBroadcast|net.FlagLoopback) == net.FlagBroadcast {
|
||||
addrs, err := intf.Addrs()
|
||||
if err != nil {
|
||||
log.Println("discover/listAddrs: warning:", err)
|
||||
errCounter++
|
||||
continue
|
||||
}
|
||||
|
||||
if Debug {
|
||||
fmt.Println("send announcement -> ", remote)
|
||||
}
|
||||
_, _, err = d.conn.WriteMsgUDP(buf, nil, remote)
|
||||
if err != nil {
|
||||
log.Println("discover/write: warning:", err)
|
||||
errCounter++
|
||||
} else {
|
||||
var srcAddr string
|
||||
for _, addr := range addrs {
|
||||
if strings.Contains(addr.String(), ".") {
|
||||
// Found an IPv4 adress
|
||||
parts := strings.Split(addr.String(), "/")
|
||||
srcAddr = parts[0]
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(srcAddr) == 0 {
|
||||
if Debug {
|
||||
log.Println("discover: debug: no source address found on interface", intf.Name)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
iaddr, err := net.ResolveUDPAddr("udp4", srcAddr+":0")
|
||||
if err != nil {
|
||||
log.Println("discover/resolve: warning:", err)
|
||||
errCounter++
|
||||
continue
|
||||
}
|
||||
|
||||
conn, err := net.ListenUDP("udp4", iaddr)
|
||||
if err != nil {
|
||||
log.Println("discover/listen: warning:", err)
|
||||
errCounter++
|
||||
continue
|
||||
}
|
||||
|
||||
if Debug {
|
||||
log.Println("discover: debug: send announcement from", conn.LocalAddr(), "to", remote, "on", intf.Name)
|
||||
}
|
||||
|
||||
_, err = conn.WriteTo(buf, remote)
|
||||
if err != nil {
|
||||
// Some interfaces don't seem to support broadcast even though the flags claims they do, i.e. vmnet
|
||||
conn.Close()
|
||||
|
||||
if Debug {
|
||||
log.Println("discover/write: debug:", err)
|
||||
}
|
||||
|
||||
errCounter++
|
||||
continue
|
||||
}
|
||||
|
||||
conn.Close()
|
||||
errCounter = 0
|
||||
}
|
||||
}
|
||||
@@ -123,9 +174,9 @@ func (d *Discoverer) sendExtAnnouncements() {
|
||||
|
||||
for errCounter < maxErrors {
|
||||
if Debug {
|
||||
fmt.Println("send announcement -> ", remote)
|
||||
log.Println("send announcement -> ", remote)
|
||||
}
|
||||
_, _, err = d.conn.WriteMsgUDP(buf, nil, remote)
|
||||
_, err = d.conn.WriteTo(buf, remote)
|
||||
if err != nil {
|
||||
log.Println("discover/write: warning:", err)
|
||||
errCounter++
|
||||
@@ -134,7 +185,7 @@ func (d *Discoverer) sendExtAnnouncements() {
|
||||
}
|
||||
time.Sleep(d.ExtBroadcastIntv)
|
||||
}
|
||||
log.Println("discover/write: %v: stopping due to too many errors:", remote, err)
|
||||
log.Printf("discover/write: %v: stopping due to too many errors: %v", remote, err)
|
||||
}
|
||||
|
||||
func (d *Discoverer) recvAnnouncements() {
|
||||
@@ -184,7 +235,6 @@ func (d *Discoverer) recvAnnouncements() {
|
||||
d.registryLock.Lock()
|
||||
_, seen := d.registry[pkt.NodeID]
|
||||
if !seen {
|
||||
fmt.Println("new node seen, forced announce")
|
||||
select {
|
||||
case d.forcedBroadcastTick <- time.Now():
|
||||
}
|
||||
@@ -290,38 +340,3 @@ func ipStr(ip []byte) string {
|
||||
}
|
||||
return strings.Join(ss, s)
|
||||
}
|
||||
|
||||
func allBroadcasts() []string {
|
||||
var bcasts = make(map[string]bool)
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
switch {
|
||||
case strings.HasPrefix(addr.String(), "127."):
|
||||
// Ignore v4 localhost
|
||||
|
||||
case strings.Contains(addr.String(), ":"):
|
||||
// Ignore all v6, because we need link local multicast there which I haven't implemented
|
||||
|
||||
default:
|
||||
if in, ok := addr.(*net.IPNet); ok {
|
||||
il := len(in.IP) - 1
|
||||
ml := len(in.Mask) - 1
|
||||
for i := range in.Mask {
|
||||
in.IP[il-i] = in.IP[il-i] | ^in.Mask[ml-i]
|
||||
}
|
||||
parts := strings.Split(in.String(), "/")
|
||||
bcasts[parts[0]] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var l []string
|
||||
for ip := range bcasts {
|
||||
l = append(l, ip)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
173
files/set.go
Normal file
173
files/set.go
Normal file
@@ -0,0 +1,173 @@
|
||||
package fileset
|
||||
|
||||
import "sync"
|
||||
|
||||
type File struct {
|
||||
Key Key
|
||||
Modified int64
|
||||
Flags uint32
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
type Key struct {
|
||||
Name string
|
||||
Version uint32
|
||||
}
|
||||
|
||||
type fileRecord struct {
|
||||
Usage int
|
||||
File File
|
||||
}
|
||||
|
||||
type bitset uint64
|
||||
|
||||
func (a Key) newerThan(b Key) bool {
|
||||
return a.Version > b.Version
|
||||
}
|
||||
|
||||
type Set struct {
|
||||
mutex sync.RWMutex
|
||||
files map[Key]fileRecord
|
||||
remoteKey [64]map[string]Key
|
||||
globalAvailability map[string]bitset
|
||||
globalKey map[string]Key
|
||||
}
|
||||
|
||||
func NewSet() *Set {
|
||||
var m = Set{
|
||||
files: make(map[Key]fileRecord),
|
||||
globalAvailability: make(map[string]bitset),
|
||||
globalKey: make(map[string]Key),
|
||||
}
|
||||
return &m
|
||||
}
|
||||
|
||||
func (m *Set) AddLocal(fs []File) {
|
||||
m.mutex.Lock()
|
||||
m.unlockedAddRemote(0, fs)
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (m *Set) SetLocal(fs []File) {
|
||||
m.mutex.Lock()
|
||||
m.unlockedSetRemote(0, fs)
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (m *Set) AddRemote(cid uint, fs []File) {
|
||||
if cid < 1 || cid > 63 {
|
||||
panic("Connection ID must be in the range 1 - 63 inclusive")
|
||||
}
|
||||
m.mutex.Lock()
|
||||
m.unlockedAddRemote(cid, fs)
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (m *Set) SetRemote(cid uint, fs []File) {
|
||||
if cid < 1 || cid > 63 {
|
||||
panic("Connection ID must be in the range 1 - 63 inclusive")
|
||||
}
|
||||
m.mutex.Lock()
|
||||
m.unlockedSetRemote(cid, fs)
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (m *Set) unlockedAddRemote(cid uint, fs []File) {
|
||||
remFiles := m.remoteKey[cid]
|
||||
for _, f := range fs {
|
||||
n := f.Key.Name
|
||||
|
||||
if ck, ok := remFiles[n]; ok && ck == f.Key {
|
||||
// The remote already has exactly this file, skip it
|
||||
continue
|
||||
}
|
||||
|
||||
remFiles[n] = f.Key
|
||||
|
||||
// Keep the block list or increment the usage
|
||||
if br, ok := m.files[f.Key]; !ok {
|
||||
m.files[f.Key] = fileRecord{
|
||||
Usage: 1,
|
||||
File: f,
|
||||
}
|
||||
} else {
|
||||
br.Usage++
|
||||
m.files[f.Key] = br
|
||||
}
|
||||
|
||||
// Update global view
|
||||
gk, ok := m.globalKey[n]
|
||||
switch {
|
||||
case ok && f.Key == gk:
|
||||
av := m.globalAvailability[n]
|
||||
av |= 1 << cid
|
||||
m.globalAvailability[n] = av
|
||||
case f.Key.newerThan(gk):
|
||||
m.globalKey[n] = f.Key
|
||||
m.globalAvailability[n] = 1 << cid
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Set) unlockedSetRemote(cid uint, fs []File) {
|
||||
// Decrement usage for all files belonging to this remote, and remove
|
||||
// those that are no longer needed.
|
||||
for _, fk := range m.remoteKey[cid] {
|
||||
br, ok := m.files[fk]
|
||||
switch {
|
||||
case ok && br.Usage == 1:
|
||||
delete(m.files, fk)
|
||||
case ok && br.Usage > 1:
|
||||
br.Usage--
|
||||
m.files[fk] = br
|
||||
}
|
||||
}
|
||||
|
||||
// Clear existing remote remoteKey
|
||||
m.remoteKey[cid] = make(map[string]Key)
|
||||
|
||||
// Recalculate global based on all remaining remoteKey
|
||||
for n := range m.globalKey {
|
||||
var nk Key // newest key
|
||||
var na bitset // newest availability
|
||||
|
||||
for i, rem := range m.remoteKey {
|
||||
if rk, ok := rem[n]; ok {
|
||||
switch {
|
||||
case rk == nk:
|
||||
na |= 1 << uint(i)
|
||||
case rk.newerThan(nk):
|
||||
nk = rk
|
||||
na = 1 << uint(i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if na != 0 {
|
||||
// Someone had the file
|
||||
m.globalKey[n] = nk
|
||||
m.globalAvailability[n] = na
|
||||
} else {
|
||||
// Noone had the file
|
||||
delete(m.globalKey, n)
|
||||
delete(m.globalAvailability, n)
|
||||
}
|
||||
}
|
||||
|
||||
// Add new remote remoteKey to the mix
|
||||
m.unlockedAddRemote(cid, fs)
|
||||
}
|
||||
|
||||
func (m *Set) Need(cid uint) []File {
|
||||
var fs []File
|
||||
m.mutex.Lock()
|
||||
|
||||
for name, gk := range m.globalKey {
|
||||
if gk.newerThan(m.remoteKey[cid][name]) {
|
||||
fs = append(fs, m.files[gk].File)
|
||||
}
|
||||
}
|
||||
|
||||
m.mutex.Unlock()
|
||||
return fs
|
||||
}
|
||||
207
files/set_test.go
Normal file
207
files/set_test.go
Normal file
@@ -0,0 +1,207 @@
|
||||
package fileset
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGlobalSet(t *testing.T) {
|
||||
m := NewSet()
|
||||
|
||||
local := []File{
|
||||
File{Key{"a", 1000}, 0, 0, nil},
|
||||
File{Key{"b", 1000}, 0, 0, nil},
|
||||
File{Key{"c", 1000}, 0, 0, nil},
|
||||
File{Key{"d", 1000}, 0, 0, nil},
|
||||
}
|
||||
|
||||
remote := []File{
|
||||
File{Key{"a", 1000}, 0, 0, nil},
|
||||
File{Key{"b", 1001}, 0, 0, nil},
|
||||
File{Key{"c", 1002}, 0, 0, nil},
|
||||
File{Key{"e", 1000}, 0, 0, nil},
|
||||
}
|
||||
|
||||
expectedGlobal := map[string]Key{
|
||||
"a": local[0].Key,
|
||||
"b": remote[1].Key,
|
||||
"c": remote[2].Key,
|
||||
"d": local[3].Key,
|
||||
"e": remote[3].Key,
|
||||
}
|
||||
|
||||
m.SetLocal(local)
|
||||
m.SetRemote(1, remote)
|
||||
|
||||
if !reflect.DeepEqual(m.globalKey, expectedGlobal) {
|
||||
t.Errorf("Global incorrect;\n%v !=\n%v", m.globalKey, expectedGlobal)
|
||||
}
|
||||
|
||||
if lb := len(m.files); lb != 7 {
|
||||
t.Errorf("Num files incorrect %d != 7\n%v", lb, m.files)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSetLocal10k(b *testing.B) {
|
||||
m := NewSet()
|
||||
|
||||
var local []File
|
||||
for i := 0; i < 10000; i++ {
|
||||
local = append(local, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
|
||||
}
|
||||
|
||||
var remote []File
|
||||
for i := 0; i < 10000; i++ {
|
||||
remote = append(remote, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
|
||||
}
|
||||
|
||||
m.SetRemote(1, remote)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
m.SetLocal(local)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSetLocal10(b *testing.B) {
|
||||
m := NewSet()
|
||||
|
||||
var local []File
|
||||
for i := 0; i < 10; i++ {
|
||||
local = append(local, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
|
||||
}
|
||||
|
||||
var remote []File
|
||||
for i := 0; i < 10000; i++ {
|
||||
remote = append(remote, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
|
||||
}
|
||||
|
||||
m.SetRemote(1, remote)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
m.SetLocal(local)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAddLocal10k(b *testing.B) {
|
||||
m := NewSet()
|
||||
|
||||
var local []File
|
||||
for i := 0; i < 10000; i++ {
|
||||
local = append(local, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
|
||||
}
|
||||
|
||||
var remote []File
|
||||
for i := 0; i < 10000; i++ {
|
||||
remote = append(remote, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
|
||||
}
|
||||
|
||||
m.SetRemote(1, remote)
|
||||
m.SetLocal(local)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
for j := range local {
|
||||
local[j].Key.Version++
|
||||
}
|
||||
b.StartTimer()
|
||||
m.AddLocal(local)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAddLocal10(b *testing.B) {
|
||||
m := NewSet()
|
||||
|
||||
var local []File
|
||||
for i := 0; i < 10; i++ {
|
||||
local = append(local, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
|
||||
}
|
||||
|
||||
var remote []File
|
||||
for i := 0; i < 10000; i++ {
|
||||
remote = append(remote, File{Key{fmt.Sprintf("file%d"), 1000}, 0, 0, nil})
|
||||
}
|
||||
|
||||
m.SetRemote(1, remote)
|
||||
m.SetLocal(local)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for j := range local {
|
||||
local[j].Key.Version++
|
||||
}
|
||||
m.AddLocal(local)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGlobalReset(t *testing.T) {
|
||||
m := NewSet()
|
||||
|
||||
local := []File{
|
||||
File{Key{"a", 1000}, 0, 0, nil},
|
||||
File{Key{"b", 1000}, 0, 0, nil},
|
||||
File{Key{"c", 1000}, 0, 0, nil},
|
||||
File{Key{"d", 1000}, 0, 0, nil},
|
||||
}
|
||||
|
||||
remote := []File{
|
||||
File{Key{"a", 1000}, 0, 0, nil},
|
||||
File{Key{"b", 1001}, 0, 0, nil},
|
||||
File{Key{"c", 1002}, 0, 0, nil},
|
||||
File{Key{"e", 1000}, 0, 0, nil},
|
||||
}
|
||||
|
||||
expectedGlobalKey := map[string]Key{
|
||||
"a": local[0].Key,
|
||||
"b": local[1].Key,
|
||||
"c": local[2].Key,
|
||||
"d": local[3].Key,
|
||||
}
|
||||
|
||||
m.SetLocal(local)
|
||||
m.SetRemote(1, remote)
|
||||
m.SetRemote(1, nil)
|
||||
|
||||
if !reflect.DeepEqual(m.globalKey, expectedGlobalKey) {
|
||||
t.Errorf("Global incorrect;\n%v !=\n%v", m.globalKey, expectedGlobalKey)
|
||||
}
|
||||
|
||||
if lb := len(m.files); lb != 4 {
|
||||
t.Errorf("Num files incorrect %d != 4\n%v", lb, m.files)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNeed(t *testing.T) {
|
||||
m := NewSet()
|
||||
|
||||
local := []File{
|
||||
File{Key{"a", 1000}, 0, 0, nil},
|
||||
File{Key{"b", 1000}, 0, 0, nil},
|
||||
File{Key{"c", 1000}, 0, 0, nil},
|
||||
File{Key{"d", 1000}, 0, 0, nil},
|
||||
}
|
||||
|
||||
remote := []File{
|
||||
File{Key{"a", 1000}, 0, 0, nil},
|
||||
File{Key{"b", 1001}, 0, 0, nil},
|
||||
File{Key{"c", 1002}, 0, 0, nil},
|
||||
File{Key{"e", 1000}, 0, 0, nil},
|
||||
}
|
||||
|
||||
shouldNeed := []File{
|
||||
File{Key{"b", 1001}, 0, 0, nil},
|
||||
File{Key{"c", 1002}, 0, 0, nil},
|
||||
File{Key{"e", 1000}, 0, 0, nil},
|
||||
}
|
||||
|
||||
m.SetLocal(local)
|
||||
m.SetRemote(1, remote)
|
||||
|
||||
need := m.Need(0)
|
||||
if !reflect.DeepEqual(need, shouldNeed) {
|
||||
t.Errorf("Need incorrect;\n%v !=\n%v", need, shouldNeed)
|
||||
}
|
||||
}
|
||||
@@ -46,7 +46,6 @@ func (e *ErrPipe) Write(data []byte) (int, error) {
|
||||
e.PipeWriter.CloseWithError(e.err)
|
||||
e.closed = true
|
||||
return n, e.err
|
||||
} else {
|
||||
return e.PipeWriter.Write(data)
|
||||
}
|
||||
return e.PipeWriter.Write(data)
|
||||
}
|
||||
|
||||
@@ -31,7 +31,8 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrClusterHash = fmt.Errorf("Configuration error: mismatched cluster hash")
|
||||
ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
|
||||
ErrClosed = errors.New("connection closed")
|
||||
)
|
||||
|
||||
type Model interface {
|
||||
@@ -56,7 +57,7 @@ type Connection struct {
|
||||
xw *xdr.Writer
|
||||
closed bool
|
||||
awaiting map[int]chan asyncResult
|
||||
nextId int
|
||||
nextID int
|
||||
indexSent map[string]map[string][2]int64
|
||||
peerOptions map[string]string
|
||||
myOptions map[string]string
|
||||
@@ -68,8 +69,6 @@ type Connection struct {
|
||||
statisticsLock sync.Mutex
|
||||
}
|
||||
|
||||
var ErrClosed = errors.New("Connection closed")
|
||||
|
||||
type asyncResult struct {
|
||||
val []byte
|
||||
err error
|
||||
@@ -105,7 +104,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
|
||||
c.myOptions = options
|
||||
go func() {
|
||||
c.Lock()
|
||||
header{0, c.nextId, messageTypeOptions}.encodeXDR(c.xw)
|
||||
header{0, c.nextID, messageTypeOptions}.encodeXDR(c.xw)
|
||||
var om OptionsMessage
|
||||
for k, v := range options {
|
||||
om.Options = append(om.Options, Option{k, v})
|
||||
@@ -118,7 +117,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
|
||||
if err != nil {
|
||||
log.Println("Warning: Write error during initial handshake:", err)
|
||||
}
|
||||
c.nextId++
|
||||
c.nextID++
|
||||
c.Unlock()
|
||||
}()
|
||||
}
|
||||
@@ -155,12 +154,12 @@ func (c *Connection) Index(repo string, idx []FileInfo) {
|
||||
idx = diff
|
||||
}
|
||||
|
||||
header{0, c.nextId, msgType}.encodeXDR(c.xw)
|
||||
header{0, c.nextID, msgType}.encodeXDR(c.xw)
|
||||
_, err := IndexMessage{repo, idx}.encodeXDR(c.xw)
|
||||
if err == nil {
|
||||
err = c.flush()
|
||||
}
|
||||
c.nextId = (c.nextId + 1) & 0xfff
|
||||
c.nextID = (c.nextID + 1) & 0xfff
|
||||
c.hasSentIndex = true
|
||||
c.Unlock()
|
||||
|
||||
@@ -178,8 +177,8 @@ func (c *Connection) Request(repo string, name string, offset int64, size int) (
|
||||
return nil, ErrClosed
|
||||
}
|
||||
rc := make(chan asyncResult)
|
||||
c.awaiting[c.nextId] = rc
|
||||
header{0, c.nextId, messageTypeRequest}.encodeXDR(c.xw)
|
||||
c.awaiting[c.nextID] = rc
|
||||
header{0, c.nextID, messageTypeRequest}.encodeXDR(c.xw)
|
||||
_, err := RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw)
|
||||
if err == nil {
|
||||
err = c.flush()
|
||||
@@ -189,7 +188,7 @@ func (c *Connection) Request(repo string, name string, offset int64, size int) (
|
||||
c.close(err)
|
||||
return nil, err
|
||||
}
|
||||
c.nextId = (c.nextId + 1) & 0xfff
|
||||
c.nextID = (c.nextID + 1) & 0xfff
|
||||
c.Unlock()
|
||||
|
||||
res, ok := <-rc
|
||||
@@ -206,8 +205,8 @@ func (c *Connection) ping() bool {
|
||||
return false
|
||||
}
|
||||
rc := make(chan asyncResult, 1)
|
||||
c.awaiting[c.nextId] = rc
|
||||
header{0, c.nextId, messageTypePing}.encodeXDR(c.xw)
|
||||
c.awaiting[c.nextID] = rc
|
||||
header{0, c.nextID, messageTypePing}.encodeXDR(c.xw)
|
||||
err := c.flush()
|
||||
if err != nil {
|
||||
c.Unlock()
|
||||
@@ -218,7 +217,7 @@ func (c *Connection) ping() bool {
|
||||
c.close(c.xw.Error())
|
||||
return false
|
||||
}
|
||||
c.nextId = (c.nextId + 1) & 0xfff
|
||||
c.nextID = (c.nextID + 1) & 0xfff
|
||||
c.Unlock()
|
||||
|
||||
res, ok := <-rc
|
||||
@@ -268,7 +267,7 @@ loop:
|
||||
break loop
|
||||
}
|
||||
if hdr.version != 0 {
|
||||
c.close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version))
|
||||
c.close(fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version))
|
||||
break loop
|
||||
}
|
||||
|
||||
@@ -371,7 +370,7 @@ loop:
|
||||
}
|
||||
|
||||
default:
|
||||
c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
|
||||
c.close(fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType))
|
||||
break loop
|
||||
}
|
||||
}
|
||||
@@ -410,10 +409,10 @@ func (c *Connection) pingerLoop() {
|
||||
select {
|
||||
case ok := <-rc:
|
||||
if !ok {
|
||||
c.close(fmt.Errorf("Ping failure"))
|
||||
c.close(fmt.Errorf("ping failure"))
|
||||
}
|
||||
case <-time.After(pingTimeout):
|
||||
c.close(fmt.Errorf("Ping timeout"))
|
||||
c.close(fmt.Errorf("ping timeout"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ func TestPing(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPingErr(t *testing.T) {
|
||||
e := errors.New("Something broke")
|
||||
e := errors.New("something broke")
|
||||
|
||||
for i := 0; i < 12; i++ {
|
||||
for j := 0; j < 12; j++ {
|
||||
@@ -64,7 +64,7 @@ func TestPingErr(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRequestResponseErr(t *testing.T) {
|
||||
e := errors.New("Something broke")
|
||||
e := errors.New("something broke")
|
||||
|
||||
var pass bool
|
||||
for i := 0; i < 48; i++ {
|
||||
@@ -99,16 +99,16 @@ func TestRequestResponseErr(t *testing.T) {
|
||||
t.Errorf("Incorrect response data %q", string(d))
|
||||
}
|
||||
if m0.repo != "default" {
|
||||
t.Error("Incorrect repo %q", m0.repo)
|
||||
t.Errorf("Incorrect repo %q", m0.repo)
|
||||
}
|
||||
if m0.name != "tn" {
|
||||
t.Error("Incorrect name %q", m0.name)
|
||||
t.Errorf("Incorrect name %q", m0.name)
|
||||
}
|
||||
if m0.offset != 1234 {
|
||||
t.Error("Incorrect offset %d", m0.offset)
|
||||
t.Errorf("Incorrect offset %d", m0.offset)
|
||||
}
|
||||
if m0.size != 5678 {
|
||||
t.Error("Incorrect size %d", m0.size)
|
||||
t.Errorf("Incorrect size %d", m0.size)
|
||||
}
|
||||
t.Logf("Pass at %d+%d bytes", i, j)
|
||||
pass = true
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"io"
|
||||
)
|
||||
|
||||
var ErrElementSizeExceeded = errors.New("Element size exceeded")
|
||||
var ErrElementSizeExceeded = errors.New("element size exceeded")
|
||||
|
||||
type Reader struct {
|
||||
r io.Reader
|
||||
|
||||
Reference in New Issue
Block a user