Compare commits

...

32 Commits

Author SHA1 Message Date
Jakob Borg
e6b29988e5 Logo 2014-02-07 22:33:58 +01:00
Jakob Borg
3cb7b8f22b Allow multiple listenAddresses (fixes #52) 2014-02-05 23:17:17 +01:00
Jakob Borg
2297e29502 Give friendly names to nodes (fixes #54) 2014-02-05 22:49:26 +01:00
Jakob Borg
ea41acfff5 Clarify status badges and fix column widths (fixes #53) 2014-02-05 22:42:23 +01:00
Jakob Borg
1aefc50e35 Always show local node, and summarize traffic stats (fixes #43) 2014-02-05 21:30:04 +01:00
Jakob Borg
9bd4fa5008 Make immediate write error only slightly less cryptic (fixes #51) 2014-02-05 20:58:39 +01:00
Jakob Borg
89c2f61b30 Reduce default verbosity now that the GUI is there 2014-02-03 16:22:15 +01:00
Jakob Borg
a1d575894a Edit configuration in GUI; use XML configuration 2014-02-03 15:42:59 +01:00
Jakob Borg
71def3a970 Don't include resource fork crap in builds (fixes #48) 2014-02-01 20:23:02 +01:00
Jakob Borg
13854250b3 Always show self in cluster list (fixes #43) 2014-02-01 11:22:41 +01:00
Jakob Borg
e6078f9449 Streamline build script 2014-02-01 10:10:07 +01:00
Jakob Borg
5980952495 Actually load index cache again (fixes #45) 2014-01-29 22:02:38 +01:00
Jakob Borg
618c376e18 Synchronize zero sized files (fixes #44) 2014-01-29 21:52:27 +01:00
Jakob Borg
d31a126408 CONTRIBUTING.md 2014-01-28 19:10:39 +01:00
Jakob Borg
6d3f8a2c06 Parallell -> parallel (ref #13) 2014-01-26 16:48:20 +01:00
Jakob Borg
b1ba976122 Move auto generated source to a package 2014-01-26 15:02:06 +01:00
Jakob Borg
81d5d1d4a6 Rework config/flags (fixes #13) 2014-01-26 14:45:03 +01:00
Jakob Borg
ea5ef28c5a Performance: improve need computation 2014-01-23 22:20:15 +01:00
Jakob Borg
fc2ebc6cad Performance: make filequeue not suck 2014-01-23 16:39:12 +01:00
Jakob Borg
01096fff6c Add version info to GUI (fixes #41) 2014-01-23 13:13:15 +01:00
Jakob Borg
2ea3558283 Add Options message to protocol 2014-01-23 13:12:45 +01:00
Jakob Borg
20a47695fb Create syncthing.ini template (fixes #39) 2014-01-22 14:28:14 +01:00
Jakob Borg
1dde9ec2d8 New file change suppression algorithm (fixes #30) 2014-01-22 12:52:27 +01:00
Jakob Borg
0841a46055 Don't crash on invalid options 2014-01-22 12:52:15 +01:00
Jakob Borg
84c0749d20 Slightly more compact GUI resources 2014-01-20 23:17:57 +01:00
Jakob Borg
6b02f9e44f Fix GUI files modtime (ish...) 2014-01-20 23:08:29 +01:00
Jakob Borg
84d7452f9e Use embed instead of nrsc, enables 'go get' 2014-01-20 23:01:38 +01:00
Jakob Borg
9b449cb527 Fix windows build (fixes #38) 2014-01-20 23:00:49 +01:00
Jakob Borg
d9ffd359e2 Tweak locking and integration test. 2014-01-20 22:22:27 +01:00
Jakob Borg
b67443eb40 Integration test 2014-01-20 07:38:57 +01:00
Jakob Borg
4ac204b604 Fine grained locking 2014-01-20 07:38:48 +01:00
Jakob Borg
fff50b5472 Delete deadlock 2014-01-14 17:47:27 -07:00
46 changed files with 1900 additions and 9077 deletions

22
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,22 @@
Please do contribute!
## Building
[See the wiki](https://github.com/calmh/syncthing/wiki/Building)
## Tests
Yes please!
## Style
`go fmt`
## Documentation
[Hack it here](https://github.com/calmh/syncthing/wiki)
## License
MIT

BIN
assets/st-logo.pxm Normal file
View File

Binary file not shown.

6
auto/gui.files.go Normal file
View File

File diff suppressed because one or more lines are too long

View File

@@ -1,24 +1,22 @@
#!/bin/bash
export COPYFILE_DISABLE=true
version=$(git describe --always)
buildDir=dist
if [[ $1 == "-f" ]] ; then
fast=yes
shift
fi
if [[ $fast != yes ]] ; then
go get -d
go test ./...
fi
if [[ -z $1 ]] ; then
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing gui
go build -ldflags "-X main.Version $version"
elif [[ $1 == "embed" ]] ; then
embedder auto gui > auto/gui.files.go \
&& go build -ldflags "-X main.Version $version"
elif [[ $1 == "tar" ]] ; then
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing gui \
&& mkdir syncthing-dist \
&& cp syncthing README.md LICENSE syncthing-dist \
&& tar zcvf syncthing-dist.tar.gz syncthing-dist \
@@ -27,56 +25,27 @@ elif [[ $1 == "all" ]] ; then
rm -rf "$buildDir"
mkdir -p "$buildDir" || exit 1
for goos in darwin linux freebsd ; do
for goarch in amd64 386 ; do
echo "$goos-$goarch"
export GOOS="$goos"
export GOARCH="$goarch"
export name="syncthing-$goos-$goarch"
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing gui \
&& mkdir -p "$name" \
&& cp syncthing "$buildDir/$name" \
&& cp README.md LICENSE "$name" \
&& mv syncthing "$name" \
&& tar zcf "$buildDir/$name.tar.gz" "$name" \
&& rm -r "$name"
done
done
for goos in linux ; do
for goarm in 5 6 7 ; do
for goarch in arm ; do
echo "$goos-${goarch}v$goarm"
export GOARM="$goarm"
export GOOS="$goos"
export GOARCH="$goarch"
export name="syncthing-$goos-${goarch}v$goarm"
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing gui \
&& mkdir -p "$name" \
&& cp syncthing "$buildDir/$name" \
&& cp README.md LICENSE "$name" \
&& mv syncthing "$name" \
&& tar zcf "$buildDir/$name.tar.gz" "$name" \
&& rm -r "$name"
done
done
done
for goos in windows ; do
for goarch in amd64 386 ; do
echo "$goos-$goarch"
export GOOS="$goos"
export GOARCH="$goarch"
export name="syncthing-$goos-$goarch"
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing.exe gui \
&& mkdir -p "$name" \
&& mv syncthing.exe "$buildDir/$name.exe" \
&& cp README.md LICENSE "$name" \
&& zip -qr "$buildDir/$name.zip" "$name" \
&& rm -r "$name"
done
export GOARM=7
for os in darwin-amd64 linux-386 linux-amd64 linux-arm freebsd-386 freebsd-amd64 windows-386 windows-amd64 ; do
echo "$os"
export name="syncthing-$os"
export GOOS=${os%-*}
export GOARCH=${os#*-}
go build -ldflags "-X main.Version $version"
mkdir -p "$name"
cp README.md LICENSE "$name"
case $GOOS in
windows)
cp syncthing.exe "$buildDir/$name.exe"
mv syncthing.exe "$name"
zip -qr "$buildDir/$name.zip" "$name"
;;
*)
cp syncthing "$buildDir/$name"
mv syncthing "$name"
tar zcf "$buildDir/$name.tar.gz" "$name"
;;
esac
rm -r "$name"
done
fi

156
config.go Normal file
View File

@@ -0,0 +1,156 @@
package main
import (
"encoding/xml"
"io"
"reflect"
"strconv"
"strings"
)
type Configuration struct {
Version int `xml:"version,attr" default:"1"`
Repositories []RepositoryConfiguration `xml:"repository"`
Options OptionsConfiguration `xml:"options"`
XMLName xml.Name `xml:"configuration" json:"-"`
}
type RepositoryConfiguration struct {
Directory string `xml:"directory,attr"`
Nodes []NodeConfiguration `xml:"node"`
}
type NodeConfiguration struct {
NodeID string `xml:"id,attr"`
Name string `xml:"name,attr"`
Addresses []string `xml:"address"`
}
type OptionsConfiguration struct {
ListenAddress []string `xml:"listenAddress" default:":22000" ini:"listen-address"`
ReadOnly bool `xml:"readOnly" ini:"read-only"`
AllowDelete bool `xml:"allowDelete" default:"true" ini:"allow-delete"`
FollowSymlinks bool `xml:"followSymlinks" default:"true" ini:"follow-symlinks"`
GUIEnabled bool `xml:"guiEnabled" default:"true" ini:"gui-enabled"`
GUIAddress string `xml:"guiAddress" default:"127.0.0.1:8080" ini:"gui-address"`
GlobalAnnServer string `xml:"globalAnnounceServer" default:"syncthing.nym.se:22025" ini:"global-announce-server"`
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true" ini:"global-announce-enabled"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true" ini:"local-announce-enabled"`
ParallelRequests int `xml:"parallelRequests" default:"16" ini:"parallel-requests"`
MaxSendKbps int `xml:"maxSendKbps" ini:"max-send-kbps"`
RescanIntervalS int `xml:"rescanIntervalS" default:"60" ini:"rescan-interval"`
ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60" ini:"reconnection-interval"`
MaxChangeKbps int `xml:"maxChangeKbps" default:"1000" ini:"max-change-bw"`
}
func setDefaults(data interface{}) error {
s := reflect.ValueOf(data).Elem()
t := s.Type()
for i := 0; i < s.NumField(); i++ {
f := s.Field(i)
tag := t.Field(i).Tag
v := tag.Get("default")
if len(v) > 0 {
switch f.Interface().(type) {
case string:
f.SetString(v)
case []string:
rv := reflect.MakeSlice(reflect.TypeOf([]string{}), 1, 1)
rv.Index(0).SetString(v)
f.Set(rv)
case int:
i, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return err
}
f.SetInt(i)
case bool:
f.SetBool(v == "true")
default:
panic(f.Type())
}
}
}
return nil
}
func readConfigINI(m map[string]string, data interface{}) error {
s := reflect.ValueOf(data).Elem()
t := s.Type()
for i := 0; i < s.NumField(); i++ {
f := s.Field(i)
tag := t.Field(i).Tag
name := tag.Get("ini")
if len(name) == 0 {
name = strings.ToLower(t.Field(i).Name)
}
if v, ok := m[name]; ok {
switch f.Interface().(type) {
case string:
f.SetString(v)
case int:
i, err := strconv.ParseInt(v, 10, 64)
if err == nil {
f.SetInt(i)
}
case bool:
f.SetBool(v == "true")
default:
panic(f.Type())
}
}
}
return nil
}
func writeConfigXML(wr io.Writer, cfg Configuration) error {
e := xml.NewEncoder(wr)
e.Indent("", " ")
err := e.Encode(cfg)
if err != nil {
return err
}
_, err = wr.Write([]byte("\n"))
return err
}
func uniqueStrings(ss []string) []string {
var m = make(map[string]bool, len(ss))
for _, s := range ss {
m[s] = true
}
var us = make([]string, 0, len(m))
for k := range m {
us = append(us, k)
}
return us
}
func readConfigXML(rd io.Reader) (Configuration, error) {
var cfg Configuration
setDefaults(&cfg)
setDefaults(&cfg.Options)
var err error
if rd != nil {
err = xml.NewDecoder(rd).Decode(&cfg)
}
cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress)
return cfg, err
}

View File

@@ -100,7 +100,6 @@ type Discoverer struct {
MyID string
ListenPort int
BroadcastIntv time.Duration
ExtListenPort int
ExtBroadcastIntv time.Duration
conn *net.UDPConn
@@ -114,7 +113,7 @@ type Discoverer struct {
// When we hit this many errors in succession, we stop.
const maxErrors = 30
func NewDiscoverer(id string, port int, extPort int, extServer string) (*Discoverer, error) {
func NewDiscoverer(id string, port int, extServer string) (*Discoverer, error) {
local4 := &net.UDPAddr{IP: net.IP{0, 0, 0, 0}, Port: AnnouncementPort}
conn, err := net.ListenUDP("udp4", local4)
if err != nil {
@@ -125,7 +124,6 @@ func NewDiscoverer(id string, port int, extPort int, extServer string) (*Discove
MyID: id,
ListenPort: port,
BroadcastIntv: 30 * time.Second,
ExtListenPort: extPort,
ExtBroadcastIntv: 1800 * time.Second,
conn: conn,
@@ -138,7 +136,7 @@ func NewDiscoverer(id string, port int, extPort int, extServer string) (*Discove
if disc.ListenPort > 0 {
disc.sendAnnouncements()
}
if len(disc.extServer) > 0 && disc.ExtListenPort > 0 {
if len(disc.extServer) > 0 {
disc.sendExtAnnouncements()
}
@@ -153,13 +151,13 @@ func (d *Discoverer) sendAnnouncements() {
}
func (d *Discoverer) sendExtAnnouncements() {
extIP, err := net.ResolveUDPAddr("udp", d.extServer+":22025")
extIP, err := net.ResolveUDPAddr("udp", d.extServer)
if err != nil {
log.Printf("discover/external: %v; no external announcements", err)
return
}
buf := EncodePacket(Packet{AnnouncementMagic, uint16(d.ExtListenPort), d.MyID, nil})
buf := EncodePacket(Packet{AnnouncementMagic, uint16(22000), d.MyID, nil})
go d.writeAnnouncements(buf, extIP, d.ExtBroadcastIntv)
}
@@ -213,7 +211,7 @@ func (d *Discoverer) recvAnnouncements() {
}
func (d *Discoverer) externalLookup(node string) (string, bool) {
extIP, err := net.ResolveUDPAddr("udp", d.extServer+":22025")
extIP, err := net.ResolveUDPAddr("udp", d.extServer)
if err != nil {
log.Printf("discover/external: %v; no external lookup", err)
return "", false

61
gui.go
View File

@@ -2,16 +2,11 @@ package main
import (
"encoding/json"
"fmt"
"io"
"log"
"mime"
"net/http"
"path/filepath"
"runtime"
"sync"
"bitbucket.org/tebeka/nrsc"
"github.com/calmh/syncthing/model"
"github.com/codegangsta/martini"
)
@@ -26,9 +21,11 @@ func startGUI(addr string, m *model.Model) {
router.Get("/rest/need", restGetNeed)
router.Get("/rest/system", restGetSystem)
router.Post("/rest/config", restPostConfig)
go func() {
mr := martini.New()
mr.Use(nrscStatic("gui"))
mr.Use(embeddedStatic())
mr.Use(martini.Recovery())
mr.Action(router.Handle)
mr.Map(m)
@@ -37,7 +34,6 @@ func startGUI(addr string, m *model.Model) {
warnln("GUI not possible:", err)
}
}()
}
func getRoot(w http.ResponseWriter, r *http.Request) {
@@ -74,12 +70,16 @@ func restGetConnections(m *model.Model, w http.ResponseWriter) {
}
func restGetConfig(w http.ResponseWriter) {
var res = make(map[string]interface{})
res["myID"] = myID
res["repository"] = config.OptionMap("repository")
res["nodes"] = config.OptionMap("nodes")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(res)
json.NewEncoder(w).Encode(cfg)
}
func restPostConfig(req *http.Request) {
err := json.NewDecoder(req.Body).Decode(&cfg)
if err != nil {
log.Println(err)
} else {
saveConfig()
}
}
type guiFile model.File
@@ -113,6 +113,7 @@ func restGetSystem(w http.ResponseWriter) {
runtime.ReadMemStats(&m)
res := make(map[string]interface{})
res["myID"] = myID
res["goroutines"] = runtime.NumGoroutine()
res["alloc"] = m.Alloc
res["sys"] = m.Sys
@@ -123,37 +124,3 @@ func restGetSystem(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(res)
}
func nrscStatic(path string) interface{} {
if err := nrsc.Initialize(); err != nil {
panic("Unable to initialize nrsc: " + err.Error())
}
return func(res http.ResponseWriter, req *http.Request, log *log.Logger) {
file := req.URL.Path
// nrsc expects there not to be a leading slash
if file[0] == '/' {
file = file[1:]
}
f := nrsc.Get(file)
if f == nil {
return
}
rdr, err := f.Open()
if err != nil {
http.Error(res, "Internal Server Error", http.StatusInternalServerError)
}
defer rdr.Close()
mtype := mime.TypeByExtension(filepath.Ext(req.URL.Path))
if len(mtype) != 0 {
res.Header().Set("Content-Type", mtype)
}
res.Header().Set("Content-Size", fmt.Sprintf("%d", f.Size()))
res.Header().Set("Last-Modified", f.ModTime().UTC().Format(http.TimeFormat))
io.Copy(res, rdr)
}
}

View File

@@ -4,6 +4,28 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
var prevDate = 0;
var modelGetOK = true;
$scope.connections = {};
$scope.config = {};
$scope.myID = "";
$scope.nodes = [];
// Strings before bools look better
$scope.settings = [
{id: 'ListenStr', descr:"Sync Protocol Listen Addresses", type: 'string', restart: true},
{id: 'GUIAddress', descr: "GUI Listen Address", type: 'string', restart: true},
{id: 'MaxSendKbps', descr: "Outgoing Rate Limit (KBps)", type: 'string', restart: true},
{id: 'RescanIntervalS', descr: "Rescan Interval (s)", type: 'string', restart: true},
{id: 'ReconnectIntervalS', descr: "Reconnect Interval (s)", type: 'string', restart: true},
{id: 'ParallelRequests', descr: "Max Outstanding Requests", type: 'string', restart: true},
{id: 'MaxChangeKbps', descr: "Max File Change Rate (KBps)", type: 'string', restart: true},
{id: 'ReadOnly', descr: "Read Only", type: 'bool', restart: true},
{id: 'AllowDelete', descr: "Allow Delete", type: 'bool', restart: true},
{id: 'FollowSymlinks', descr: "Follow Symlinks", type: 'bool', restart: true},
{id: 'GlobalAnnEnabled', descr: "Global Announce", type: 'bool', restart: true},
{id: 'LocalAnnEnabled', descr: "Local Announce", type: 'bool', restart: true},
];
function modelGetSucceeded() {
if (!modelGetOK) {
$('#networkError').modal('hide');
@@ -21,8 +43,23 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
$http.get("/rest/version").success(function (data) {
$scope.version = data;
});
$http.get("/rest/config").success(function (data) {
$scope.config = data;
$http.get("/rest/system").success(function (data) {
$scope.system = data;
$scope.myID = data.myID;
$http.get("/rest/config").success(function (data) {
$scope.config = data;
$scope.config.Options.ListenStr = $scope.config.Options.ListenAddress.join(", ")
var nodes = $scope.config.Repositories[0].Nodes;
nodes = nodes.filter(function (x) { return x.NodeID != $scope.myID; });
nodes.sort(function (a, b) {
if (a.NodeID < b.NodeID)
return -1;
return a.NodeID > b.NodeID;
})
$scope.nodes = nodes;
});
});
$scope.refresh = function () {
@@ -40,6 +77,8 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
var td = (now - prevDate) / 1000;
prevDate = now;
$scope.inbps = 0
$scope.outbps = 0
for (var id in data) {
try {
data[id].inbps = Math.max(0, 8 * (data[id].InBytesTotal - $scope.connections[id].InBytesTotal) / td);
@@ -48,6 +87,8 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
data[id].inbps = 0;
data[id].outbps = 0;
}
$scope.inbps += data[id].outbps;
$scope.outbps += data[id].inbps;
}
$scope.connections = data;
});
@@ -70,6 +111,131 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
});
};
$scope.nodeIcon = function (nodeCfg) {
if ($scope.connections[nodeCfg.NodeID]) {
return "ok";
}
return "minus";
};
$scope.nodeStatus = function (nodeCfg) {
if ($scope.connections[nodeCfg.NodeID]) {
return "Connected";
}
return "Disconnected";
};
$scope.nodeIcon = function (nodeCfg) {
if ($scope.connections[nodeCfg.NodeID]) {
return "ok";
}
return "minus";
};
$scope.nodeClass = function (nodeCfg) {
var conn = $scope.connections[nodeCfg.NodeID];
if (conn) {
return "success";
}
return "info";
};
$scope.nodeAddr = function (nodeCfg) {
var conn = $scope.connections[nodeCfg.NodeID];
if (conn) {
return conn.Address;
}
return nodeCfg.Addresses.join(", ");
};
$scope.nodeVer = function (nodeCfg) {
if (nodeCfg.NodeID === $scope.myID) {
return $scope.version;
}
var conn = $scope.connections[nodeCfg.NodeID];
if (conn) {
return conn.ClientVersion;
}
return "";
};
$scope.nodeName = function (nodeCfg) {
if (nodeCfg.Name) {
return nodeCfg.Name;
}
return nodeCfg.NodeID.substr(0, 6);
};
$scope.saveSettings = function () {
$scope.config.Options.ListenAddress = $scope.config.Options.ListenStr.split(',').map(function (x) { return x.trim(); });
$http.post('/rest/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}});
$('#settingsTable').collapse('hide');
};
$scope.editNode = function (nodeCfg) {
$scope.currentNode = nodeCfg;
$scope.editingExisting = true;
$scope.currentNode.AddressesStr = nodeCfg.Addresses.join(", ")
$('#editNode').modal({backdrop: 'static', keyboard: false});
};
$scope.addNode = function () {
$scope.currentNode = {NodeID: "", AddressesStr: "dynamic"};
$scope.editingExisting = false;
$('#editNode').modal({backdrop: 'static', keyboard: false});
};
$scope.deleteNode = function () {
$('#editNode').modal('hide');
if (!$scope.editingExisting)
return;
var newNodes = [];
for (var i = 0; i < $scope.nodes.length; i++) {
if ($scope.nodes[i].NodeID !== $scope.currentNode.NodeID) {
newNodes.push($scope.nodes[i]);
}
}
$scope.nodes = newNodes;
$scope.config.Repositories[0].Nodes = newNodes;
$http.post('/rest/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}})
}
$scope.saveNode = function () {
$('#editNode').modal('hide');
nodeCfg = $scope.currentNode;
nodeCfg.Addresses = nodeCfg.AddressesStr.split(',').map(function (x) { return x.trim(); });
var done = false;
for (var i = 0; i < $scope.nodes.length; i++) {
if ($scope.nodes[i].NodeID === nodeCfg.NodeID) {
$scope.nodes[i] = nodeCfg;
done = true;
break;
}
}
if (!done) {
$scope.nodes.push(nodeCfg);
}
$scope.nodes.sort(function (a, b) {
if (a.NodeID < b.NodeID)
return -1;
return a.NodeID > b.NodeID;
})
$scope.config.Repositories[0].Nodes = $scope.nodes;
$http.post('/rest/config', JSON.stringify($scope.config), {headers: {'Content-Type': 'application/json'}})
};
$scope.refresh();
setInterval($scope.refresh, 10000);
});
@@ -90,7 +256,7 @@ syncthing.filter('natural', function() {
syncthing.filter('binary', function() {
return function(input) {
if (input === undefined) {
return '- '
return '0 '
}
if (input > 1024 * 1024 * 1024) {
input /= 1024 * 1024 * 1024;
@@ -111,7 +277,7 @@ syncthing.filter('binary', function() {
syncthing.filter('metric', function() {
return function(input) {
if (input === undefined) {
return '- '
return '0 '
}
if (input > 1000 * 1000 * 1000) {
input /= 1000 * 1000 * 1000;
@@ -143,3 +309,15 @@ syncthing.filter('alwaysNumber', function() {
return input;
}
});
syncthing.directive('optionEditor', function() {
return {
restrict: 'C',
replace: true,
transclude: true,
scope: {
setting: '=setting',
},
template: '<input type="text" ng-model="config.Options[setting.id]"></input>',
};
})

View File

@@ -1,397 +0,0 @@
/*!
* Bootstrap v3.0.3 (http://getbootstrap.com)
* Copyright 2013 Twitter, Inc.
* Licensed under http://www.apache.org/licenses/LICENSE-2.0
*/
.btn-default,
.btn-primary,
.btn-success,
.btn-info,
.btn-warning,
.btn-danger {
text-shadow: 0 -1px 0 rgba(0, 0, 0, 0.2);
-webkit-box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.15), 0 1px 1px rgba(0, 0, 0, 0.075);
box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.15), 0 1px 1px rgba(0, 0, 0, 0.075);
}
.btn-default:active,
.btn-primary:active,
.btn-success:active,
.btn-info:active,
.btn-warning:active,
.btn-danger:active,
.btn-default.active,
.btn-primary.active,
.btn-success.active,
.btn-info.active,
.btn-warning.active,
.btn-danger.active {
-webkit-box-shadow: inset 0 3px 5px rgba(0, 0, 0, 0.125);
box-shadow: inset 0 3px 5px rgba(0, 0, 0, 0.125);
}
.btn:active,
.btn.active {
background-image: none;
}
.btn-default {
text-shadow: 0 1px 0 #fff;
background-image: -webkit-linear-gradient(top, #ffffff 0%, #e0e0e0 100%);
background-image: linear-gradient(to bottom, #ffffff 0%, #e0e0e0 100%);
background-repeat: repeat-x;
border-color: #dbdbdb;
border-color: #ccc;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffffffff', endColorstr='#ffe0e0e0', GradientType=0);
filter: progid:DXImageTransform.Microsoft.gradient(enabled=false);
}
.btn-default:hover,
.btn-default:focus {
background-color: #e0e0e0;
background-position: 0 -15px;
}
.btn-default:active,
.btn-default.active {
background-color: #e0e0e0;
border-color: #dbdbdb;
}
.btn-primary {
background-image: -webkit-linear-gradient(top, #428bca 0%, #2d6ca2 100%);
background-image: linear-gradient(to bottom, #428bca 0%, #2d6ca2 100%);
background-repeat: repeat-x;
border-color: #2b669a;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff428bca', endColorstr='#ff2d6ca2', GradientType=0);
filter: progid:DXImageTransform.Microsoft.gradient(enabled=false);
}
.btn-primary:hover,
.btn-primary:focus {
background-color: #2d6ca2;
background-position: 0 -15px;
}
.btn-primary:active,
.btn-primary.active {
background-color: #2d6ca2;
border-color: #2b669a;
}
.btn-success {
background-image: -webkit-linear-gradient(top, #5cb85c 0%, #419641 100%);
background-image: linear-gradient(to bottom, #5cb85c 0%, #419641 100%);
background-repeat: repeat-x;
border-color: #3e8f3e;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff5cb85c', endColorstr='#ff419641', GradientType=0);
filter: progid:DXImageTransform.Microsoft.gradient(enabled=false);
}
.btn-success:hover,
.btn-success:focus {
background-color: #419641;
background-position: 0 -15px;
}
.btn-success:active,
.btn-success.active {
background-color: #419641;
border-color: #3e8f3e;
}
.btn-warning {
background-image: -webkit-linear-gradient(top, #f0ad4e 0%, #eb9316 100%);
background-image: linear-gradient(to bottom, #f0ad4e 0%, #eb9316 100%);
background-repeat: repeat-x;
border-color: #e38d13;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#fff0ad4e', endColorstr='#ffeb9316', GradientType=0);
filter: progid:DXImageTransform.Microsoft.gradient(enabled=false);
}
.btn-warning:hover,
.btn-warning:focus {
background-color: #eb9316;
background-position: 0 -15px;
}
.btn-warning:active,
.btn-warning.active {
background-color: #eb9316;
border-color: #e38d13;
}
.btn-danger {
background-image: -webkit-linear-gradient(top, #d9534f 0%, #c12e2a 100%);
background-image: linear-gradient(to bottom, #d9534f 0%, #c12e2a 100%);
background-repeat: repeat-x;
border-color: #b92c28;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffd9534f', endColorstr='#ffc12e2a', GradientType=0);
filter: progid:DXImageTransform.Microsoft.gradient(enabled=false);
}
.btn-danger:hover,
.btn-danger:focus {
background-color: #c12e2a;
background-position: 0 -15px;
}
.btn-danger:active,
.btn-danger.active {
background-color: #c12e2a;
border-color: #b92c28;
}
.btn-info {
background-image: -webkit-linear-gradient(top, #5bc0de 0%, #2aabd2 100%);
background-image: linear-gradient(to bottom, #5bc0de 0%, #2aabd2 100%);
background-repeat: repeat-x;
border-color: #28a4c9;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff5bc0de', endColorstr='#ff2aabd2', GradientType=0);
filter: progid:DXImageTransform.Microsoft.gradient(enabled=false);
}
.btn-info:hover,
.btn-info:focus {
background-color: #2aabd2;
background-position: 0 -15px;
}
.btn-info:active,
.btn-info.active {
background-color: #2aabd2;
border-color: #28a4c9;
}
.thumbnail,
.img-thumbnail {
-webkit-box-shadow: 0 1px 2px rgba(0, 0, 0, 0.075);
box-shadow: 0 1px 2px rgba(0, 0, 0, 0.075);
}
.dropdown-menu > li > a:hover,
.dropdown-menu > li > a:focus {
background-color: #e8e8e8;
background-image: -webkit-linear-gradient(top, #f5f5f5 0%, #e8e8e8 100%);
background-image: linear-gradient(to bottom, #f5f5f5 0%, #e8e8e8 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#fff5f5f5', endColorstr='#ffe8e8e8', GradientType=0);
}
.dropdown-menu > .active > a,
.dropdown-menu > .active > a:hover,
.dropdown-menu > .active > a:focus {
background-color: #357ebd;
background-image: -webkit-linear-gradient(top, #428bca 0%, #357ebd 100%);
background-image: linear-gradient(to bottom, #428bca 0%, #357ebd 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff428bca', endColorstr='#ff357ebd', GradientType=0);
}
.navbar-default {
background-image: -webkit-linear-gradient(top, #ffffff 0%, #f8f8f8 100%);
background-image: linear-gradient(to bottom, #ffffff 0%, #f8f8f8 100%);
background-repeat: repeat-x;
border-radius: 4px;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffffffff', endColorstr='#fff8f8f8', GradientType=0);
filter: progid:DXImageTransform.Microsoft.gradient(enabled=false);
-webkit-box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.15), 0 1px 5px rgba(0, 0, 0, 0.075);
box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.15), 0 1px 5px rgba(0, 0, 0, 0.075);
}
.navbar-default .navbar-nav > .active > a {
background-image: -webkit-linear-gradient(top, #ebebeb 0%, #f3f3f3 100%);
background-image: linear-gradient(to bottom, #ebebeb 0%, #f3f3f3 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffebebeb', endColorstr='#fff3f3f3', GradientType=0);
-webkit-box-shadow: inset 0 3px 9px rgba(0, 0, 0, 0.075);
box-shadow: inset 0 3px 9px rgba(0, 0, 0, 0.075);
}
.navbar-brand,
.navbar-nav > li > a {
text-shadow: 0 1px 0 rgba(255, 255, 255, 0.25);
}
.navbar-inverse {
background-image: -webkit-linear-gradient(top, #3c3c3c 0%, #222222 100%);
background-image: linear-gradient(to bottom, #3c3c3c 0%, #222222 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff3c3c3c', endColorstr='#ff222222', GradientType=0);
filter: progid:DXImageTransform.Microsoft.gradient(enabled=false);
}
.navbar-inverse .navbar-nav > .active > a {
background-image: -webkit-linear-gradient(top, #222222 0%, #282828 100%);
background-image: linear-gradient(to bottom, #222222 0%, #282828 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff222222', endColorstr='#ff282828', GradientType=0);
-webkit-box-shadow: inset 0 3px 9px rgba(0, 0, 0, 0.25);
box-shadow: inset 0 3px 9px rgba(0, 0, 0, 0.25);
}
.navbar-inverse .navbar-brand,
.navbar-inverse .navbar-nav > li > a {
text-shadow: 0 -1px 0 rgba(0, 0, 0, 0.25);
}
.navbar-static-top,
.navbar-fixed-top,
.navbar-fixed-bottom {
border-radius: 0;
}
.alert {
text-shadow: 0 1px 0 rgba(255, 255, 255, 0.2);
-webkit-box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.25), 0 1px 2px rgba(0, 0, 0, 0.05);
box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.25), 0 1px 2px rgba(0, 0, 0, 0.05);
}
.alert-success {
background-image: -webkit-linear-gradient(top, #dff0d8 0%, #c8e5bc 100%);
background-image: linear-gradient(to bottom, #dff0d8 0%, #c8e5bc 100%);
background-repeat: repeat-x;
border-color: #b2dba1;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffdff0d8', endColorstr='#ffc8e5bc', GradientType=0);
}
.alert-info {
background-image: -webkit-linear-gradient(top, #d9edf7 0%, #b9def0 100%);
background-image: linear-gradient(to bottom, #d9edf7 0%, #b9def0 100%);
background-repeat: repeat-x;
border-color: #9acfea;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffd9edf7', endColorstr='#ffb9def0', GradientType=0);
}
.alert-warning {
background-image: -webkit-linear-gradient(top, #fcf8e3 0%, #f8efc0 100%);
background-image: linear-gradient(to bottom, #fcf8e3 0%, #f8efc0 100%);
background-repeat: repeat-x;
border-color: #f5e79e;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#fffcf8e3', endColorstr='#fff8efc0', GradientType=0);
}
.alert-danger {
background-image: -webkit-linear-gradient(top, #f2dede 0%, #e7c3c3 100%);
background-image: linear-gradient(to bottom, #f2dede 0%, #e7c3c3 100%);
background-repeat: repeat-x;
border-color: #dca7a7;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#fff2dede', endColorstr='#ffe7c3c3', GradientType=0);
}
.progress {
background-image: -webkit-linear-gradient(top, #ebebeb 0%, #f5f5f5 100%);
background-image: linear-gradient(to bottom, #ebebeb 0%, #f5f5f5 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffebebeb', endColorstr='#fff5f5f5', GradientType=0);
}
.progress-bar {
background-image: -webkit-linear-gradient(top, #428bca 0%, #3071a9 100%);
background-image: linear-gradient(to bottom, #428bca 0%, #3071a9 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff428bca', endColorstr='#ff3071a9', GradientType=0);
}
.progress-bar-success {
background-image: -webkit-linear-gradient(top, #5cb85c 0%, #449d44 100%);
background-image: linear-gradient(to bottom, #5cb85c 0%, #449d44 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff5cb85c', endColorstr='#ff449d44', GradientType=0);
}
.progress-bar-info {
background-image: -webkit-linear-gradient(top, #5bc0de 0%, #31b0d5 100%);
background-image: linear-gradient(to bottom, #5bc0de 0%, #31b0d5 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff5bc0de', endColorstr='#ff31b0d5', GradientType=0);
}
.progress-bar-warning {
background-image: -webkit-linear-gradient(top, #f0ad4e 0%, #ec971f 100%);
background-image: linear-gradient(to bottom, #f0ad4e 0%, #ec971f 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#fff0ad4e', endColorstr='#ffec971f', GradientType=0);
}
.progress-bar-danger {
background-image: -webkit-linear-gradient(top, #d9534f 0%, #c9302c 100%);
background-image: linear-gradient(to bottom, #d9534f 0%, #c9302c 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffd9534f', endColorstr='#ffc9302c', GradientType=0);
}
.list-group {
border-radius: 4px;
-webkit-box-shadow: 0 1px 2px rgba(0, 0, 0, 0.075);
box-shadow: 0 1px 2px rgba(0, 0, 0, 0.075);
}
.list-group-item.active,
.list-group-item.active:hover,
.list-group-item.active:focus {
text-shadow: 0 -1px 0 #3071a9;
background-image: -webkit-linear-gradient(top, #428bca 0%, #3278b3 100%);
background-image: linear-gradient(to bottom, #428bca 0%, #3278b3 100%);
background-repeat: repeat-x;
border-color: #3278b3;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff428bca', endColorstr='#ff3278b3', GradientType=0);
}
.panel {
-webkit-box-shadow: 0 1px 2px rgba(0, 0, 0, 0.05);
box-shadow: 0 1px 2px rgba(0, 0, 0, 0.05);
}
.panel-default > .panel-heading {
background-image: -webkit-linear-gradient(top, #f5f5f5 0%, #e8e8e8 100%);
background-image: linear-gradient(to bottom, #f5f5f5 0%, #e8e8e8 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#fff5f5f5', endColorstr='#ffe8e8e8', GradientType=0);
}
.panel-primary > .panel-heading {
background-image: -webkit-linear-gradient(top, #428bca 0%, #357ebd 100%);
background-image: linear-gradient(to bottom, #428bca 0%, #357ebd 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ff428bca', endColorstr='#ff357ebd', GradientType=0);
}
.panel-success > .panel-heading {
background-image: -webkit-linear-gradient(top, #dff0d8 0%, #d0e9c6 100%);
background-image: linear-gradient(to bottom, #dff0d8 0%, #d0e9c6 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffdff0d8', endColorstr='#ffd0e9c6', GradientType=0);
}
.panel-info > .panel-heading {
background-image: -webkit-linear-gradient(top, #d9edf7 0%, #c4e3f3 100%);
background-image: linear-gradient(to bottom, #d9edf7 0%, #c4e3f3 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffd9edf7', endColorstr='#ffc4e3f3', GradientType=0);
}
.panel-warning > .panel-heading {
background-image: -webkit-linear-gradient(top, #fcf8e3 0%, #faf2cc 100%);
background-image: linear-gradient(to bottom, #fcf8e3 0%, #faf2cc 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#fffcf8e3', endColorstr='#fffaf2cc', GradientType=0);
}
.panel-danger > .panel-heading {
background-image: -webkit-linear-gradient(top, #f2dede 0%, #ebcccc 100%);
background-image: linear-gradient(to bottom, #f2dede 0%, #ebcccc 100%);
background-repeat: repeat-x;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#fff2dede', endColorstr='#ffebcccc', GradientType=0);
}
.well {
background-image: -webkit-linear-gradient(top, #e8e8e8 0%, #f5f5f5 100%);
background-image: linear-gradient(to bottom, #e8e8e8 0%, #f5f5f5 100%);
background-repeat: repeat-x;
border-color: #dcdcdc;
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffe8e8e8', endColorstr='#fff5f5f5', GradientType=0);
-webkit-box-shadow: inset 0 1px 3px rgba(0, 0, 0, 0.05), 0 1px 0 rgba(255, 255, 255, 0.1);
box-shadow: inset 0 1px 3px rgba(0, 0, 0, 0.05), 0 1px 0 rgba(255, 255, 255, 0.1);
}

8
gui/bootstrap/css/bootstrap-theme.min.css vendored Normal file → Executable file
View File

File diff suppressed because one or more lines are too long

View File

File diff suppressed because it is too large Load Diff

View File

File diff suppressed because one or more lines are too long

0
gui/bootstrap/fonts/glyphicons-halflings-regular.eot Normal file → Executable file
View File

0
gui/bootstrap/fonts/glyphicons-halflings-regular.svg Normal file → Executable file
View File

Before

Width:  |  Height:  |  Size: 61 KiB

After

Width:  |  Height:  |  Size: 61 KiB

0
gui/bootstrap/fonts/glyphicons-halflings-regular.ttf Normal file → Executable file
View File

0
gui/bootstrap/fonts/glyphicons-halflings-regular.woff Normal file → Executable file
View File

View File

File diff suppressed because it is too large Load Diff

8
gui/bootstrap/js/bootstrap.min.js vendored Normal file → Executable file
View File

File diff suppressed because one or more lines are too long

BIN
gui/favicon.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.8 KiB

View File

@@ -6,10 +6,10 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="">
<meta name="author" content="">
<link rel="shortcut icon" href="../../docs-assets/ico/favicon.png">
<link rel="shortcut icon" href="favicon.png">
<title>syncthing</title>
<link href="bootstrap/css/bootstrap.css" rel="stylesheet">
<link href="bootstrap/css/bootstrap.min.css" rel="stylesheet">
<style type="text/css">
html, body {
height: 100%;
@@ -35,14 +35,19 @@ html, body {
.text-monospace {
font-family: monospace;
}
.table-condensed>thead>tr>th, .table-condensed>tbody>tr>th, .table-condensed>tfoot>tr>th, .table-condensed>thead>tr>td, .table-condensed>tbody>tr>td, .table-condensed>tfoot>tr>td {
border-top: none;
}
</style>
</head>
<body ng-controller="SyncthingCtrl">
<div id="wrap">
<div class="container">
<div class="header">
<h3 class="text-muted">syncthing</h3>
<div class="page-header">
<h1 class="text-muted"><img width="64" height="64" src="st-logo-128.png"> syncthing</h1>
</div>
<div class="row">
@@ -63,6 +68,83 @@ html, body {
</div>
</div>
<div class="row">
<div class="col-md-12">
<div class="panel panel-info">
<div class="panel-heading"><h3 class="panel-title">Cluster</h3></div>
<table class="table table-condensed">
<tbody>
<!-- myself -->
<tr class="text-muted">
<td style="width:13%">
<span class="label label-default">
<span class="glyphicon glyphicon-ok"></span> This node
</span>
</td>
<td style="width:12%">
<span class="text-monospace">{{myID | short}}</span>
</td>
<td style="width:20%">{{version}}</td>
<td style="width:25%"></td>
<td style="width:10%" class="text-right">
<span ng-show="nodeCfg.NodeID != myID">
{{inbps | metric}}bps
<span class="text-muted glyphicon glyphicon-chevron-down"></span>
</span>
</td>
<td style="width:10%" class="text-right">
<span ng-show="nodeCfg.NodeID != myID">
{{outbps | metric}}bps
<span class="text-muted glyphicon glyphicon-chevron-up"></span>
</span>
</td>
<td style="width:10%" class="text-right"></td>
</tr>
<!-- all other nodes -->
<tr ng-repeat="nodeCfg in nodes">
<td>
<span class="label label-{{nodeClass(nodeCfg)}}">
<span class="glyphicon glyphicon-{{nodeIcon(nodeCfg)}}"></span> {{nodeStatus(nodeCfg)}}
</span>
</td>
<td>
<span class="text-monospace">{{nodeName(nodeCfg)}}</span>
</td>
<td>
{{nodeVer(nodeCfg)}}
</td>
<td>
{{nodeAddr(nodeCfg)}}
</td>
<td class="text-right">
<abbr title="{{connections[nodeCfg.NodeID].InBytesTotal | binary}}B">{{connections[nodeCfg.NodeID].inbps | metric}}bps</abbr>
<span class="text-muted glyphicon glyphicon-chevron-down"></span>
</td>
<td class="text-right">
<abbr title="{{connections[nodeCfg.NodeID].OutBytesTotal | binary}}B">{{connections[nodeCfg.NodeID].outbps | metric}}bps</abbr>
<span class="text-muted glyphicon glyphicon-chevron-up"></span>
</td>
<td class="text-right">
<button type="button" ng-click="editNode(nodeCfg)" class="btn btn-default btn-xs"><span class="glyphicon glyphicon-pencil"></span> Edit</button>
</td>
</tr>
<tr>
<td></td>
<td></td>
<td></td>
<td></td>
<td></td>
<td></td>
<td class="text-right">
<button type="button" class="btn btn-default btn-xs" ng-click="addNode()"><span class="glyphicon glyphicon-plus"></span> Add</button>
</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
<div class="row">
<div class="col-md-6">
<div class="panel panel-info">
@@ -75,7 +157,9 @@ html, body {
<span class="text-muted">(+{{model.localDeleted | alwaysNumber}} delete records)</span></p>
</div>
</div>
</div>
<div class="col-md-6">
<div class="panel panel-info">
<div class="panel-heading"><h3 class="panel-title">System</h3></div>
<div class="panel-body">
@@ -83,53 +167,34 @@ html, body {
<p>{{system.cpuPercent | alwaysNumber | natural:1}}% CPU, {{system.goroutines | alwaysNumber}} goroutines</p>
</div>
</div>
<div ng-show="model.needFiles > 0">
<h2>Files to Synchronize</h2>
<table class="table table-condensed table-striped">
<tr ng-repeat="file in need track by $index">
<td><abbr title="{{file.Name}}">{{file.ShortName}}</abbr></td>
<td class="text-right">{{file.Size | binary}}B</td>
</tr>
</table>
</div>
</div>
</div>
<div class="row">
<div class="col-md-6">
<div class="panel panel-info">
<div class="panel-heading"><h3 class="panel-title">Cluster</h3></div>
<table class="table table-condensed">
<tbody>
<tr ng-repeat="(node, address) in config.nodes" ng-class="{'text-primary': !!connections[node], 'text-muted': node == config.myID}">
<td><abbr class="text-monospace" title="{{node}}">{{node | short}}</abbr></td>
<td>
<span ng-show="node == config.myID">
<span class="glyphicon glyphicon-ok"></span>
(this node)
</span>
<span ng-show="node != config.myID && !!connections[node]">
<span class="glyphicon glyphicon-link"></span>
{{connections[node].Address}}
</span>
<span ng-show="node != config.myID && !connections[node]">
<span class="glyphicon glyphicon-cog"></span>
{{address}}
</span>
</td>
<td class="text-right">
<span ng-show="node != config.myID">
<abbr title="{{connections[node].InBytesTotal | binary}}B">{{connections[node].inbps | metric}}b/s</abbr>
<span class="text-muted glyphicon glyphicon-cloud-download"></span>
</span>
</td>
<td class="text-right">
<span ng-show="node != config.myID">
<abbr title="{{connections[node].OutBytesTotal | binary}}B">{{connections[node].outbps | metric}}b/s</abbr>
<span class="text-muted glyphicon glyphicon-cloud-upload"></span>
</span>
</td>
</tr>
</tbody>
</table>
<div class="panel-heading"><h3 class="panel-title"><a href="" data-toggle="collapse" data-target="#settingsTable">Settings</a></h3></div>
<div id="settingsTable" class="panel-collapse collapse">
<div class="panel-body">
<form role="form">
<div class="form-group" ng-repeat="setting in settings">
<div ng-if="setting.type == 'string'">
<label for="{{setting.id}}">{{setting.descr}}</label>
<input id="{{setting.id}}" class="form-control" type="text" ng-model="config.Options[setting.id]"></input>
</div>
<div class="checkbox" ng-if="setting.type == 'bool'">
<label>
{{setting.descr}} <input id="{{setting.id}}" type="checkbox" ng-model="config.Options[setting.id]"></input>
</label>
</div>
</div>
</form>
</div>
<div class="panel-footer">
<button type="button" class="btn btn-sm btn-default" ng-click="saveSettings()">Save</button>
<small><span class="text-muted">Changes take effect when restarting syncthing.</span></small>
</div>
</div>
</div>
</div>
</div>
@@ -162,6 +227,45 @@ html, body {
</div>
</div>
<div id="editNode" class="modal fade">
<div class="modal-dialog modal-lg">
<div class="modal-content">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal" aria-hidden="true">&times;</button>
<h4 class="modal-title">Edit Node</h4>
</div>
<div class="modal-body">
<form role="form">
<div class="form-group">
<label for="nodeID">Node ID</label>
<input placeholder="YUFJOUDPORCMA..." ng-disabled="editingExisting" id="nodeID" class="form-control" type="text" ng-model="currentNode.NodeID"></input>
<p class="help-block">The node ID can be found in the logs or in the "Add Node" dialog on the other node.</p>
</div>
<div class="form-group">
<label for="name">Name</label>
<input placeholder="Home Server" id="name" class="form-control" type="text" ng-model="currentNode.Name"></input>
<p class="help-block">Shown instead of Node ID in the cluster status.</p>
</div>
<div class="form-group">
<label for="addresses">Addresses</label>
<input placeholder="dynamic" id="addresses" class="form-control" type="text" ng-model="currentNode.AddressesStr"></input>
<p class="help-block">Enter comma separated <span class="text-monospace">ip:port</span> addresses or <span class="text-monospace">dynamic</span> to perform automatic discovery of the address.</p>
</div>
</form>
<div ng-show="!editingExisting">
When adding a new node, keep in mind that <em>this node</em> must be added on the other side too. The Node ID of this node is:
<pre>{{myID}}</pre>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-primary" ng-click="saveNode()">Save</button>
<button type="button" class="btn btn-default" data-dismiss="modal">Close</button>
<button ng-if="editingExisting" type="button" class="btn btn-danger pull-left" ng-click="deleteNode()">Delete</button>
</div>
</div>
</div>
</div>
<script src="angular.min.js"></script>
<script src="jquery-2.0.3.min.js"></script>
<script src="bootstrap/js/bootstrap.min.js"></script>

BIN
gui/st-logo-128.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

9
gui_development.go Normal file
View File

@@ -0,0 +1,9 @@
//+build guidev
package main
import "github.com/codegangsta/martini"
func embeddedStatic() interface{} {
return martini.Static("gui")
}

46
gui_embedded.go Normal file
View File

@@ -0,0 +1,46 @@
//+build !guidev
package main
import (
"fmt"
"log"
"mime"
"net/http"
"path/filepath"
"time"
"github.com/calmh/syncthing/auto"
"github.com/cratonica/embed"
)
func embeddedStatic() interface{} {
fs, err := embed.Unpack(auto.Resources)
if err != nil {
panic(err)
}
var modt = time.Now().UTC().Format(http.TimeFormat)
return func(res http.ResponseWriter, req *http.Request, log *log.Logger) {
file := req.URL.Path
if file[0] == '/' {
file = file[1:]
}
bs, ok := fs[file]
if !ok {
return
}
mtype := mime.TypeByExtension(filepath.Ext(req.URL.Path))
if len(mtype) != 0 {
res.Header().Set("Content-Type", mtype)
}
res.Header().Set("Content-Size", fmt.Sprintf("%d", len(bs)))
res.Header().Set("Last-Modified", modt)
res.Write(bs)
}
}

5
integration/.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
files-*
conf-*
md5-*
genfiles
md5r

42
integration/genfiles.go Normal file
View File

@@ -0,0 +1,42 @@
package main
import (
"crypto/rand"
"flag"
"fmt"
"io/ioutil"
mr "math/rand"
"os"
"path"
)
func name() string {
var b [16]byte
rand.Reader.Read(b[:])
return fmt.Sprintf("%x", b[:])
}
func main() {
var files int
var maxexp int
flag.IntVar(&files, "files", 1000, "Number of files")
flag.IntVar(&maxexp, "maxexp", 20, "Maximum file size (max = 2^n + 128*1024 B)")
flag.Parse()
for i := 0; i < files; i++ {
n := name()
p0 := path.Join(string(n[0]), n[0:2])
os.MkdirAll(p0, 0755)
s := 1 << uint(mr.Intn(maxexp))
a := 128 * 1024
if a > s {
a = s
}
s += mr.Intn(a)
b := make([]byte, s)
rand.Reader.Read(b)
p1 := path.Join(p0, n)
ioutil.WriteFile(p1, b, 0644)
}
}

59
integration/md5r.go Normal file
View File

@@ -0,0 +1,59 @@
package main
import (
"crypto/md5"
"flag"
"fmt"
"io"
"os"
"path/filepath"
)
func main() {
flag.Parse()
args := flag.Args()
if len(args) == 0 {
args = []string{"."}
}
for _, path := range args {
err := filepath.Walk(path, walker)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
}
func walker(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
sum, err := md5file(path)
if err != nil {
return err
}
fmt.Printf("%s %s\n", sum, path)
}
return nil
}
func md5file(fname string) (hash string, err error) {
f, err := os.Open(fname)
if err != nil {
return
}
defer f.Close()
h := md5.New()
io.Copy(h, f)
hb := h.Sum(nil)
hash = fmt.Sprintf("%x", hb)
return
}

74
integration/test.sh Executable file
View File

@@ -0,0 +1,74 @@
#!/bin/bash
rm -rf files-* conf-* md5-*
extraopts=""
p=$(pwd)
go build genfiles.go
go build md5r.go
echo "Setting up (keys)..."
i1=$(syncthing --home conf-1 2>&1 | awk '/My ID/ {print $7}')
echo $i1
i2=$(syncthing --home conf-2 2>&1 | awk '/My ID/ {print $7}')
echo $i2
i3=$(syncthing --home conf-3 2>&1 | awk '/My ID/ {print $7}')
echo $i3
echo "Setting up (files)..."
for i in 1 2 3 ; do
cat >conf-$i/syncthing.ini <<EOT
[repository]
dir = $p/files-$i
[nodes]
$i1 = 127.0.0.1:22001
$i2 = 127.0.0.1:22002
$i3 = 127.0.0.1:22003
[settings]
gui-enabled = false
listen-address = :2200$i
EOT
mkdir files-$i
pushd files-$i >/dev/null
../genfiles -maxexp 21 -files 400
touch empty-$i
../md5r > ../md5-$i
popd >/dev/null
done
echo "Starting..."
for i in 1 2 3 ; do
sleep 1
syncthing --home conf-$i $extraopts &
done
cat md5-* | sort > md5-tot
while true ; do
read
echo Verifying...
conv=0
for i in 1 2 3 ; do
pushd files-$i >/dev/null
../md5r | sort > ../md5-$i
popd >/dev/null
if ! cmp md5-$i md5-tot >/dev/null ; then
echo $i unconverged
else
conv=$((conv + 1))
echo $i converged
fi
done
if [[ $conv == 3 ]] ; then
kill %1
kill %2
kill %3
exit
fi
done

361
main.go
View File

@@ -3,6 +3,7 @@ package main
import (
"compress/gzip"
"crypto/tls"
"flag"
"fmt"
"log"
"net"
@@ -18,73 +19,39 @@ import (
"github.com/calmh/ini"
"github.com/calmh/syncthing/discover"
flags "github.com/calmh/syncthing/github.com/jessevdk/go-flags"
"github.com/calmh/syncthing/model"
"github.com/calmh/syncthing/protocol"
)
type Options struct {
ConfDir string `short:"c" long:"cfg" description:"Configuration directory" default:"~/.syncthing" value-name:"DIR"`
Listen string `short:"l" long:"listen" description:"Listen address" default:":22000" value-name:"ADDR"`
ReadOnly bool `short:"r" long:"ro" description:"Repository is read only"`
Rehash bool `long:"rehash" description:"Ignore cache and rehash all files in repository"`
NoDelete bool `long:"no-delete" description:"Never delete files"`
NoSymlinks bool `long:"no-symlinks" description:"Don't follow first level symlinks in the repo"`
NoStats bool `long:"no-stats" description:"Don't print model and connection statistics"`
NoGUI bool `long:"no-gui" description:"Don't start GUI"`
GUIAddr string `long:"gui-addr" description:"GUI listen address" default:"127.0.0.1:8080" value-name:"ADDR"`
ShowVersion bool `short:"v" long:"version" description:"Show version"`
Discovery DiscoveryOptions `group:"Discovery Options"`
Advanced AdvancedOptions `group:"Advanced Options"`
Debug DebugOptions `group:"Debugging Options"`
}
type DebugOptions struct {
LogSource bool `long:"log-source"`
TraceModel []string `long:"trace-model" value-name:"TRACE" description:"idx, net, file, need, pull"`
TraceConnect bool `long:"trace-connect"`
Profiler string `long:"profiler" value-name:"ADDR"`
}
type DiscoveryOptions struct {
ExternalServer string `long:"ext-server" description:"External discovery server" value-name:"NAME" default:"syncthing.nym.se"`
ExternalPort int `short:"e" long:"ext-port" description:"External listen port" value-name:"PORT" default:"22000"`
NoExternalDiscovery bool `short:"n" long:"no-ext-announce" description:"Do not announce presence externally"`
NoLocalDiscovery bool `short:"N" long:"no-local-announce" description:"Do not announce presence locally"`
}
type AdvancedOptions struct {
RequestsInFlight int `long:"reqs-in-flight" description:"Parallell in flight requests per node" default:"8" value-name:"REQS"`
LimitRate int `long:"send-rate" description:"Rate limit for outgoing data" default:"0" value-name:"KBPS"`
ScanInterval time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"`
ConnInterval time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"`
}
var opts Options
var cfg Configuration
var Version string = "unknown-dev"
const (
confFileName = "syncthing.ini"
var (
myID string
config ini.Config
)
var (
myID string
config ini.Config
nodeAddrs = make(map[string][]string)
showVersion bool
confDir string
trace string
profiler string
verbose bool
)
func main() {
_, err := flags.Parse(&opts)
if err != nil {
if err, ok := err.(*flags.Error); ok {
if err.Type == flags.ErrHelp {
os.Exit(0)
}
}
fatalln(err)
}
log.SetOutput(os.Stderr)
logger = log.New(os.Stderr, "", log.Flags())
if opts.ShowVersion {
flag.StringVar(&confDir, "home", "~/.syncthing", "Set configuration directory")
flag.StringVar(&trace, "debug.trace", "", "(connect,net,idx,file,pull)")
flag.StringVar(&profiler, "debug.profiler", "", "(addr)")
flag.BoolVar(&showVersion, "version", false, "Show version")
flag.BoolVar(&verbose, "v", false, "Be more verbose")
flag.Usage = usageFor(flag.CommandLine, "syncthing [options]")
flag.Parse()
if showVersion {
fmt.Println(Version)
os.Exit(0)
}
@@ -97,32 +64,94 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
log.SetOutput(os.Stderr)
logger = log.New(os.Stderr, "", log.Flags())
if len(opts.Debug.TraceModel) > 0 || opts.Debug.LogSource {
if len(trace) > 0 {
log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds)
logger.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds)
}
opts.ConfDir = expandTilde(opts.ConfDir)
infoln("Version", Version)
confDir = expandTilde(confDir)
// Ensure that our home directory exists and that we have a certificate and key.
ensureDir(opts.ConfDir, 0700)
cert, err := loadCert(opts.ConfDir)
ensureDir(confDir, 0700)
cert, err := loadCert(confDir)
if err != nil {
newCertificate(opts.ConfDir)
cert, err = loadCert(opts.ConfDir)
newCertificate(confDir)
cert, err = loadCert(confDir)
fatalErr(err)
}
myID = string(certId(cert.Certificate[0]))
log.SetPrefix("[" + myID[0:5] + "] ")
logger.SetPrefix("[" + myID[0:5] + "] ")
infoln("Version", Version)
infoln("My ID:", myID)
if opts.Debug.Profiler != "" {
// Prepare to be able to save configuration
cfgFile := path.Join(confDir, "config.xml")
go saveConfigLoop(cfgFile)
// Load the configuration file, if it exists.
// If it does not, create a template.
cf, err := os.Open(cfgFile)
if err == nil {
// Read config.xml
cfg, err = readConfigXML(cf)
if err != nil {
fatalln(err)
}
cf.Close()
} else {
// No config.xml, let's try the old syncthing.ini
iniFile := path.Join(confDir, "syncthing.ini")
cf, err := os.Open(iniFile)
if err == nil {
infoln("Migrating syncthing.ini to config.xml")
iniCfg := ini.Parse(cf)
cf.Close()
os.Rename(iniFile, path.Join(confDir, "migrated_syncthing.ini"))
cfg, _ = readConfigXML(nil)
cfg.Repositories = []RepositoryConfiguration{
{Directory: iniCfg.Get("repository", "dir")},
}
readConfigINI(iniCfg.OptionMap("settings"), &cfg.Options)
for name, addrs := range iniCfg.OptionMap("nodes") {
n := NodeConfiguration{
NodeID: name,
Addresses: strings.Fields(addrs),
}
cfg.Repositories[0].Nodes = append(cfg.Repositories[0].Nodes, n)
}
saveConfig()
}
}
if len(cfg.Repositories) == 0 {
infoln("No config file; starting with empty defaults")
cfg, err = readConfigXML(nil)
cfg.Repositories = []RepositoryConfiguration{
{
Directory: "~/Sync",
Nodes: []NodeConfiguration{
{NodeID: myID, Addresses: []string{"dynamic"}},
},
},
}
saveConfig()
infof("Edit %s to taste or use the GUI\n", cfgFile)
}
var dir = expandTilde(cfg.Repositories[0].Directory)
if len(profiler) > 0 {
go func() {
err := http.ListenAndServe(opts.Debug.Profiler, nil)
err := http.ListenAndServe(profiler, nil)
if err != nil {
warnln(err)
}
@@ -132,7 +161,7 @@ func main() {
// The TLS configuration is used for both the listening socket and outgoing
// connections.
cfg := &tls.Config{
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{cert},
NextProtos: []string{"bep/1.0"},
ServerName: myID,
@@ -142,92 +171,83 @@ func main() {
MinVersion: tls.VersionTLS12,
}
// Load the configuration file, if it exists.
cf, err := os.Open(path.Join(opts.ConfDir, confFileName))
if err != nil {
fatalln("No config file")
config = ini.Config{}
}
config = ini.Parse(cf)
cf.Close()
var dir = expandTilde(config.Get("repository", "dir"))
// Create a map of desired node connections based on the configuration file
// directives.
for nodeID, addrs := range config.OptionMap("nodes") {
addrs := strings.Fields(addrs)
nodeAddrs[nodeID] = addrs
}
ensureDir(dir, -1)
m := model.NewModel(dir)
for _, t := range opts.Debug.TraceModel {
m := model.NewModel(dir, cfg.Options.MaxChangeKbps*1000)
for _, t := range strings.Split(trace, ",") {
m.Trace(t)
}
if opts.Advanced.LimitRate > 0 {
m.LimitRate(opts.Advanced.LimitRate)
if cfg.Options.MaxSendKbps > 0 {
m.LimitRate(cfg.Options.MaxSendKbps)
}
// GUI
if !opts.NoGUI && opts.GUIAddr != "" {
host, port, err := net.SplitHostPort(opts.GUIAddr)
if cfg.Options.GUIEnabled && cfg.Options.GUIAddress != "" {
host, port, err := net.SplitHostPort(cfg.Options.GUIAddress)
if err != nil {
warnf("Cannot start GUI on %q: %v", opts.GUIAddr, err)
warnf("Cannot start GUI on %q: %v", cfg.Options.GUIAddress, err)
} else {
if len(host) > 0 {
infof("Starting web GUI on http://%s", opts.GUIAddr)
infof("Starting web GUI on http://%s", cfg.Options.GUIAddress)
} else {
infof("Starting web GUI on port %s", port)
}
startGUI(opts.GUIAddr, m)
startGUI(cfg.Options.GUIAddress, m)
}
}
// Walk the repository and update the local model before establishing any
// connections to other nodes.
if !opts.Rehash {
infoln("Loading index cache")
loadIndex(m)
if verbose {
infoln("Populating repository index")
}
infoln("Populating repository index")
loadIndex(m)
updateLocalModel(m)
// Routine to listen for incoming connections
infoln("Listening for incoming connections")
go listen(myID, opts.Listen, m, cfg)
if verbose {
infoln("Listening for incoming connections")
}
for _, addr := range cfg.Options.ListenAddress {
go listen(myID, addr, m, tlsCfg)
}
// Routine to connect out to configured nodes
infoln("Attempting to connect to other nodes")
go connect(myID, opts.Listen, nodeAddrs, m, cfg)
if verbose {
infoln("Attempting to connect to other nodes")
}
disc := discovery(cfg.Options.ListenAddress[0])
go connect(myID, disc, m, tlsCfg)
// Routine to pull blocks from other nodes to synchronize the local
// repository. Does not run when we are in read only (publish only) mode.
if !opts.ReadOnly {
if opts.NoDelete {
infoln("Deletes from peer nodes will be ignored")
} else {
infoln("Deletes from peer nodes are allowed")
if !cfg.Options.ReadOnly {
if verbose {
if cfg.Options.AllowDelete {
infoln("Deletes from peer nodes are allowed")
} else {
infoln("Deletes from peer nodes will be ignored")
}
okln("Ready to synchronize (read-write)")
}
okln("Ready to synchronize (read-write)")
m.StartRW(!opts.NoDelete, opts.Advanced.RequestsInFlight)
} else {
m.StartRW(cfg.Options.AllowDelete, cfg.Options.ParallelRequests)
} else if verbose {
okln("Ready to synchronize (read only; no external updates accepted)")
}
// Periodically scan the repository and update the local model.
// XXX: Should use some fsnotify mechanism.
go func() {
td := time.Duration(cfg.Options.RescanIntervalS) * time.Second
for {
time.Sleep(opts.Advanced.ScanInterval)
updateLocalModel(m)
time.Sleep(td)
if m.LocalAge() > (td / 2).Seconds() {
updateLocalModel(m)
}
}
}()
if !opts.NoStats {
if verbose {
// Periodically print statistics
go printStatsLoop(m)
}
@@ -235,6 +255,40 @@ func main() {
select {}
}
var saveConfigCh = make(chan struct{})
func saveConfigLoop(cfgFile string) {
for _ = range saveConfigCh {
fd, err := os.Create(cfgFile + ".tmp")
if err != nil {
warnln(err)
continue
}
err = writeConfigXML(fd, cfg)
if err != nil {
warnln(err)
fd.Close()
continue
}
err = fd.Close()
if err != nil {
warnln(err)
continue
}
err = os.Rename(cfgFile+".tmp", cfgFile)
if err != nil {
warnln(err)
}
}
}
func saveConfig() {
saveConfigCh <- struct{}{}
}
func printStatsLoop(m *model.Model) {
var lastUpdated int64
var lastStats = make(map[string]model.ConnectionInfo)
@@ -248,7 +302,7 @@ func printStatsLoop(m *model.Model) {
outbps := 8 * int(float64(stats.OutBytesTotal-lastStats[node].OutBytesTotal)/secs)
if inbps+outbps > 0 {
infof("%s: %sb/s in, %sb/s out", node, MetricPrefix(inbps), MetricPrefix(outbps))
infof("%s: %sb/s in, %sb/s out", node[0:5], MetricPrefix(inbps), MetricPrefix(outbps))
}
lastStats[node] = stats
@@ -266,10 +320,18 @@ func printStatsLoop(m *model.Model) {
}
}
func listen(myID string, addr string, m *model.Model, cfg *tls.Config) {
l, err := tls.Listen("tcp", addr, cfg)
func listen(myID string, addr string, m *model.Model, tlsCfg *tls.Config) {
if strings.Contains(trace, "connect") {
debugln("NET: Listening on", addr)
}
l, err := tls.Listen("tcp", addr, tlsCfg)
fatalErr(err)
connOpts := map[string]string{
"clientId": "syncthing",
"clientVersion": Version,
}
listen:
for {
conn, err := l.Accept()
@@ -278,7 +340,7 @@ listen:
continue
}
if opts.Debug.TraceConnect {
if strings.Contains(trace, "connect") {
debugln("NET: Connect from", conn.RemoteAddr())
}
@@ -302,9 +364,9 @@ listen:
warnf("Connect from connected node (%s)", remoteID)
}
for nodeID := range nodeAddrs {
if nodeID == remoteID {
protoConn := protocol.NewConnection(remoteID, conn, conn, m)
for _, nodeCfg := range cfg.Repositories[0].Nodes {
if nodeCfg.NodeID == remoteID {
protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
m.AddConnection(conn, protoConn)
continue listen
}
@@ -313,86 +375,95 @@ listen:
}
}
func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.Model, cfg *tls.Config) {
func discovery(addr string) *discover.Discoverer {
_, portstr, err := net.SplitHostPort(addr)
fatalErr(err)
port, _ := strconv.Atoi(portstr)
if opts.Discovery.NoLocalDiscovery {
if !cfg.Options.LocalAnnEnabled {
port = -1
} else {
} else if verbose {
infoln("Sending local discovery announcements")
}
if opts.Discovery.NoExternalDiscovery {
opts.Discovery.ExternalPort = -1
} else {
if !cfg.Options.GlobalAnnEnabled {
cfg.Options.GlobalAnnServer = ""
} else if verbose {
infoln("Sending external discovery announcements")
}
disc, err := discover.NewDiscoverer(myID, port, opts.Discovery.ExternalPort, opts.Discovery.ExternalServer)
disc, err := discover.NewDiscoverer(myID, port, cfg.Options.GlobalAnnServer)
if err != nil {
warnf("No discovery possible (%v)", err)
}
return disc
}
func connect(myID string, disc *discover.Discoverer, m *model.Model, tlsCfg *tls.Config) {
connOpts := map[string]string{
"clientId": "syncthing",
"clientVersion": Version,
}
for {
nextNode:
for nodeID, addrs := range nodeAddrs {
if nodeID == myID {
for _, nodeCfg := range cfg.Repositories[0].Nodes {
if nodeCfg.NodeID == myID {
continue
}
if m.ConnectedTo(nodeID) {
if m.ConnectedTo(nodeCfg.NodeID) {
continue
}
for _, addr := range addrs {
for _, addr := range nodeCfg.Addresses {
if addr == "dynamic" {
var ok bool
if disc != nil {
addr, ok = disc.Lookup(nodeID)
addr, ok = disc.Lookup(nodeCfg.NodeID)
}
if !ok {
continue
}
}
if opts.Debug.TraceConnect {
debugln("NET: Dial", nodeID, addr)
if strings.Contains(trace, "connect") {
debugln("NET: Dial", nodeCfg.NodeID, addr)
}
conn, err := tls.Dial("tcp", addr, cfg)
conn, err := tls.Dial("tcp", addr, tlsCfg)
if err != nil {
if opts.Debug.TraceConnect {
if strings.Contains(trace, "connect") {
debugln("NET:", err)
}
continue
}
remoteID := certId(conn.ConnectionState().PeerCertificates[0].Raw)
if remoteID != nodeID {
warnln("Unexpected nodeID", remoteID, "!=", nodeID)
if remoteID != nodeCfg.NodeID {
warnln("Unexpected nodeID", remoteID, "!=", nodeCfg.NodeID)
conn.Close()
continue
}
protoConn := protocol.NewConnection(remoteID, conn, conn, m)
protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
m.AddConnection(conn, protoConn)
continue nextNode
}
}
time.Sleep(opts.Advanced.ConnInterval)
time.Sleep(time.Duration(cfg.Options.ReconnectIntervalS) * time.Second)
}
}
func updateLocalModel(m *model.Model) {
files, _ := m.Walk(!opts.NoSymlinks)
files, _ := m.Walk(cfg.Options.FollowSymlinks)
m.ReplaceLocal(files)
saveIndex(m)
}
func saveIndex(m *model.Model) {
name := m.RepoID() + ".idx.gz"
fullName := path.Join(opts.ConfDir, name)
fullName := path.Join(confDir, name)
idxf, err := os.Create(fullName + ".tmp")
if err != nil {
return
@@ -408,7 +479,7 @@ func saveIndex(m *model.Model) {
func loadIndex(m *model.Model) {
name := m.RepoID() + ".idx.gz"
idxf, err := os.Open(path.Join(opts.ConfDir, name))
idxf, err := os.Open(path.Join(confDir, name))
if err != nil {
return
}

View File

@@ -37,6 +37,15 @@ func Blocks(r io.Reader, blocksize int) ([]Block, error) {
offset += int64(n)
}
if len(blocks) == 0 {
// Empty file
blocks = append(blocks, Block{
Offset: 0,
Size: 0,
Hash: []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55},
})
}
return blocks, nil
}

View File

@@ -11,7 +11,8 @@ var blocksTestData = []struct {
blocksize int
hash []string
}{
{[]byte(""), 1024, []string{}},
{[]byte(""), 1024, []string{
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"}},
{[]byte("contents"), 1024, []string{
"d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}},
{[]byte("contents"), 9, []string{
@@ -86,7 +87,7 @@ var diffTestData = []struct {
{"contents", "cantents", 3, []Block{{0, 3, nil}}},
{"contents", "contants", 3, []Block{{3, 3, nil}}},
{"contents", "cantants", 3, []Block{{0, 3, nil}, {3, 3, nil}}},
{"contents", "", 3, nil},
{"contents", "", 3, []Block{{0, 0, nil}}},
{"", "contents", 3, []Block{{0, 3, nil}, {3, 3, nil}, {6, 2, nil}}},
{"con", "contents", 3, []Block{{3, 3, nil}, {6, 2, nil}}},
{"contents", "con", 3, nil},

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"log"
"os"
"path"
"sync"
@@ -24,6 +25,10 @@ type fileMonitor struct {
}
func (m *fileMonitor) FileBegins(cc <-chan content) error {
if m.model.trace["file"] {
log.Printf("FILE: FileBegins: " + m.name)
}
tmp := tempName(m.path, m.global.Modified)
dir := path.Dir(tmp)
@@ -104,6 +109,10 @@ func (m *fileMonitor) copyRemoteBlocks(cc <-chan content, outFile *os.File, writ
}
func (m *fileMonitor) FileDone() error {
if m.model.trace["file"] {
log.Printf("FILE: FileDone: " + m.name)
}
m.writeDone.Wait()
tmp := tempName(m.path, m.global.Modified)
@@ -118,7 +127,7 @@ func (m *fileMonitor) FileDone() error {
err := hashCheck(tmp, m.global.Blocks)
if err != nil {
return fmt.Errorf("%s: %s (tmp) (deleting)", path.Base(m.name), err.Error())
return err
}
err = os.Chtimes(tmp, time.Unix(m.global.Modified, 0), time.Unix(m.global.Modified, 0))
@@ -136,7 +145,7 @@ func (m *fileMonitor) FileDone() error {
return err
}
go m.model.updateLocalLocked(m.global)
m.model.updateLocal(m.global)
return nil
}

View File

@@ -14,9 +14,11 @@ type Monitor interface {
type FileQueue struct {
files queuedFileList
lock sync.Mutex
sorted bool
fmut sync.Mutex // protects files and sorted
availability map[string][]string
amut sync.Mutex // protects availability
queued map[string]bool
}
type queuedFile struct {
@@ -56,9 +58,20 @@ type queuedBlock struct {
index int
}
func NewFileQueue() *FileQueue {
return &FileQueue{
availability: make(map[string][]string),
queued: make(map[string]bool),
}
}
func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
if q.queued[name] {
return
}
q.files = append(q.files, queuedFile{
name: name,
@@ -68,19 +81,20 @@ func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
channel: make(chan content),
monitor: monitor,
})
q.queued[name] = true
q.sorted = false
}
func (q *FileQueue) Len() int {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
return len(q.files)
}
func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
if !q.sorted {
sort.Sort(q.files)
@@ -90,7 +104,11 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
for i := range q.files {
qf := &q.files[i]
if len(q.availability[qf.name]) == 0 {
q.amut.Lock()
av := q.availability[qf.name]
q.amut.Unlock()
if len(av) == 0 {
// Noone has the file we want; abort.
if qf.remaining != len(qf.blocks) {
// We have already started on this file; close it down
@@ -99,11 +117,12 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
mon.FileDone()
}
}
delete(q.queued, qf.name)
q.deleteAt(i)
return queuedBlock{}, false
}
for _, ni := range q.availability[qf.name] {
for _, ni := range av {
// Find and return the next block in the queue
if ni == nodeID {
for j, b := range qf.blocks {
@@ -127,8 +146,8 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
}
func (q *FileQueue) Done(file string, offset int64, data []byte) {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
c := content{
offset: offset,
@@ -142,6 +161,7 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
err := qf.monitor.FileBegins(qf.channel)
if err != nil {
log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
delete(q.queued, qf.name)
q.deleteAt(i)
return
}
@@ -158,6 +178,7 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
log.Printf("WARNING: %s: %v", qf.name, err)
}
}
delete(q.queued, qf.name)
q.deleteAt(i)
}
return
@@ -166,21 +187,9 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
panic("unreachable")
}
func (q *FileQueue) Queued(file string) bool {
q.lock.Lock()
defer q.lock.Unlock()
for _, qf := range q.files {
if qf.name == file {
return true
}
}
return false
}
func (q *FileQueue) QueuedFiles() (files []string) {
q.lock.Lock()
defer q.lock.Unlock()
q.fmut.Lock()
defer q.fmut.Unlock()
for _, qf := range q.files {
files = append(files, qf.name)
@@ -196,32 +205,23 @@ func (q *FileQueue) deleteFile(n string) {
for i, file := range q.files {
if n == file.name {
q.deleteAt(i)
delete(q.queued, file.name)
return
}
}
}
func (q *FileQueue) SetAvailable(file, node string) {
q.lock.Lock()
defer q.lock.Unlock()
if q.availability == nil {
q.availability = make(map[string][]string)
}
q.availability[file] = []string{node}
}
func (q *FileQueue) SetAvailable(file string, nodes []string) {
q.amut.Lock()
defer q.amut.Unlock()
func (q *FileQueue) AddAvailable(file, node string) {
q.lock.Lock()
defer q.lock.Unlock()
if q.availability == nil {
q.availability = make(map[string][]string)
}
q.availability[file] = append(q.availability[file], node)
q.availability[file] = nodes
}
func (q *FileQueue) RemoveAvailable(toRemove string) {
q.lock.Lock()
defer q.lock.Unlock()
q.amut.Lock()
defer q.amut.Unlock()
for file, nodes := range q.availability {
for i, node := range nodes {
if node == toRemove {

View File

@@ -8,14 +8,14 @@ import (
)
func TestFileQueueAdd(t *testing.T) {
q := FileQueue{}
q := NewFileQueue()
q.Add("foo", nil, nil)
}
func TestFileQueueAddSorting(t *testing.T) {
q := FileQueue{}
q.SetAvailable("zzz", "nodeID")
q.SetAvailable("aaa", "nodeID")
q := NewFileQueue()
q.SetAvailable("zzz", []string{"nodeID"})
q.SetAvailable("aaa", []string{"nodeID"})
q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil)
q.Add("aaa", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil)
@@ -24,9 +24,9 @@ func TestFileQueueAddSorting(t *testing.T) {
t.Errorf("Incorrectly sorted get: %+v", b)
}
q = FileQueue{}
q.SetAvailable("zzz", "nodeID")
q.SetAvailable("aaa", "nodeID")
q = NewFileQueue()
q.SetAvailable("zzz", []string{"nodeID"})
q.SetAvailable("aaa", []string{"nodeID"})
q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil)
b, _ = q.Get("nodeID") // Start on zzzz
@@ -42,7 +42,7 @@ func TestFileQueueAddSorting(t *testing.T) {
}
func TestFileQueueLen(t *testing.T) {
q := FileQueue{}
q := NewFileQueue()
q.Add("foo", nil, nil)
q.Add("bar", nil, nil)
@@ -52,9 +52,9 @@ func TestFileQueueLen(t *testing.T) {
}
func TestFileQueueGet(t *testing.T) {
q := FileQueue{}
q.SetAvailable("foo", "nodeID")
q.SetAvailable("bar", "nodeID")
q := NewFileQueue()
q.SetAvailable("foo", []string{"nodeID"})
q.SetAvailable("bar", []string{"nodeID"})
q.Add("foo", []Block{
{Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")},
@@ -177,11 +177,9 @@ func TestFileQueueDone(t *testing.T) {
*/
func TestFileQueueGetNodeIDs(t *testing.T) {
q := FileQueue{}
q.SetAvailable("a-foo", "nodeID")
q.AddAvailable("a-foo", "a")
q.SetAvailable("b-bar", "nodeID")
q.AddAvailable("b-bar", "b")
q := NewFileQueue()
q.SetAvailable("a-foo", []string{"nodeID", "a"})
q.SetAvailable("b-bar", []string{"nodeID", "b"})
q.Add("a-foo", []Block{
{Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")},
@@ -254,9 +252,9 @@ func TestFileQueueThreadHandling(t *testing.T) {
total += i
}
q := FileQueue{}
q := NewFileQueue()
q.Add("foo", blocks, nil)
q.SetAvailable("foo", "nodeID")
q.SetAvailable("foo", []string{"nodeID"})
var start = make(chan bool)
var gotTot uint32

View File

@@ -1,16 +1,5 @@
package model
/*
Locking
=======
The model has read and write locks. These must be acquired as appropriate by
public methods. To prevent deadlock situations, private methods should never
acquire locks, but document what locks they require.
*/
import (
"crypto/sha1"
"errors"
@@ -28,33 +17,41 @@ import (
)
type Model struct {
sync.RWMutex
dir string
global map[string]File // the latest version of each file as it exists in the cluster
gmut sync.RWMutex // protects global
local map[string]File // the files we currently have locally on disk
lmut sync.RWMutex // protects local
remote map[string]map[string]File
rmut sync.RWMutex // protects remote
protoConn map[string]Connection
rawConn map[string]io.Closer
fq FileQueue // queue for files to fetch
dq chan File // queue for files to delete
pmut sync.RWMutex // protects protoConn and rawConn
updatedLocal int64 // timestamp of last update to local
updateGlobal int64 // timestamp of last update to remote
// Queue for files to fetch. fq can call back into the model, so we must ensure
// to hold no locks when calling methods on fq.
fq *FileQueue
dq chan File // queue for files to delete
updatedLocal int64 // timestamp of last update to local
updateGlobal int64 // timestamp of last update to remote
lastIdxBcast time.Time
lastIdxBcastRequest time.Time
umut sync.RWMutex // provides updated* and lastIdx*
rwRunning bool
delete bool
initmut sync.Mutex // protects rwRunning and delete
trace map[string]bool
fileLastChanged map[string]time.Time
fileWasSuppressed map[string]int
sup suppressor
parallellRequests int
limitRequestRate chan struct{}
parallelRequests int
limitRequestRate chan struct{}
imut sync.Mutex // protects Index
}
type Connection interface {
@@ -62,6 +59,7 @@ type Connection interface {
Index([]protocol.FileInfo)
Request(name string, offset int64, size uint32, hash []byte) ([]byte, error)
Statistics() protocol.Statistics
Option(key string) string
}
const (
@@ -80,19 +78,19 @@ var (
// NewModel creates and starts a new model. The model starts in read-only mode,
// where it sends index information to connected peers and responds to requests
// for file data without altering the local repository in any way.
func NewModel(dir string) *Model {
func NewModel(dir string, maxChangeBw int) *Model {
m := &Model{
dir: dir,
global: make(map[string]File),
local: make(map[string]File),
remote: make(map[string]map[string]File),
protoConn: make(map[string]Connection),
rawConn: make(map[string]io.Closer),
lastIdxBcast: time.Now(),
trace: make(map[string]bool),
fileLastChanged: make(map[string]time.Time),
fileWasSuppressed: make(map[string]int),
dq: make(chan File),
dir: dir,
global: make(map[string]File),
local: make(map[string]File),
remote: make(map[string]map[string]File),
protoConn: make(map[string]Connection),
rawConn: make(map[string]io.Closer),
lastIdxBcast: time.Now(),
trace: make(map[string]bool),
sup: suppressor{threshold: int64(maxChangeBw)},
fq: NewFileQueue(),
dq: make(chan File),
}
go m.broadcastIndexLoop()
@@ -116,8 +114,6 @@ func (m *Model) LimitRate(kbps int) {
// Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
func (m *Model) Trace(t string) {
m.Lock()
defer m.Unlock()
m.trace[t] = true
}
@@ -125,8 +121,8 @@ func (m *Model) Trace(t string) {
// read/write mode the model will attempt to keep in sync with the cluster by
// pulling needed files from peer nodes.
func (m *Model) StartRW(del bool, threads int) {
m.Lock()
defer m.Unlock()
m.initmut.Lock()
defer m.initmut.Unlock()
if m.rwRunning {
panic("starting started model")
@@ -134,26 +130,35 @@ func (m *Model) StartRW(del bool, threads int) {
m.rwRunning = true
m.delete = del
m.parallellRequests = threads
m.parallelRequests = threads
go m.cleanTempFiles()
if del {
go m.deleteFiles()
go m.deleteLoop()
}
}
// Generation returns an opaque integer that is guaranteed to increment on
// every change to the local repository or global model.
func (m *Model) Generation() int64 {
m.RLock()
defer m.RUnlock()
m.umut.RLock()
defer m.umut.RUnlock()
return m.updatedLocal + m.updateGlobal
}
func (m *Model) LocalAge() float64 {
m.umut.RLock()
defer m.umut.RUnlock()
return time.Since(time.Unix(m.updatedLocal, 0)).Seconds()
}
type ConnectionInfo struct {
protocol.Statistics
Address string
Address string
ClientID string
ClientVersion string
}
// ConnectionStats returns a map with connection statistics for each connected node.
@@ -162,27 +167,29 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
RemoteAddr() net.Addr
}
m.RLock()
defer m.RUnlock()
m.pmut.RLock()
var res = make(map[string]ConnectionInfo)
for node, conn := range m.protoConn {
ci := ConnectionInfo{
Statistics: conn.Statistics(),
Statistics: conn.Statistics(),
ClientID: conn.Option("clientId"),
ClientVersion: conn.Option("clientVersion"),
}
if nc, ok := m.rawConn[node].(remoteAddrer); ok {
ci.Address = nc.RemoteAddr().String()
}
res[node] = ci
}
m.pmut.RUnlock()
return res
}
// LocalSize returns the number of files, deleted files and total bytes for all
// files in the global model.
func (m *Model) GlobalSize() (files, deleted, bytes int) {
m.RLock()
defer m.RUnlock()
m.gmut.RLock()
for _, f := range m.global {
if f.Flags&protocol.FlagDeleted == 0 {
@@ -192,14 +199,15 @@ func (m *Model) GlobalSize() (files, deleted, bytes int) {
deleted++
}
}
m.gmut.RUnlock()
return
}
// LocalSize returns the number of files, deleted files and total bytes for all
// files in the local repository.
func (m *Model) LocalSize() (files, deleted, bytes int) {
m.RLock()
defer m.RUnlock()
m.lmut.RLock()
for _, f := range m.local {
if f.Flags&protocol.FlagDeleted == 0 {
@@ -209,14 +217,16 @@ func (m *Model) LocalSize() (files, deleted, bytes int) {
deleted++
}
}
m.lmut.RUnlock()
return
}
// InSyncSize returns the number and total byte size of the local files that
// are in sync with the global model.
func (m *Model) InSyncSize() (files, bytes int) {
m.RLock()
defer m.RUnlock()
m.gmut.RLock()
m.lmut.RLock()
for n, f := range m.local {
if gf, ok := m.global[n]; ok && f.Equals(gf) {
@@ -224,67 +234,89 @@ func (m *Model) InSyncSize() (files, bytes int) {
bytes += f.Size()
}
}
m.lmut.RUnlock()
m.gmut.RUnlock()
return
}
// NeedFiles returns the list of currently needed files and the total size.
func (m *Model) NeedFiles() (files []File, bytes int) {
m.RLock()
defer m.RUnlock()
qf := m.fq.QueuedFiles()
for _, n := range m.fq.QueuedFiles() {
m.gmut.RLock()
for _, n := range qf {
f := m.global[n]
files = append(files, f)
bytes += f.Size()
}
m.gmut.RUnlock()
return
}
// Index is called when a new node is connected and we receive their full index.
// Implements the protocol.Model interface.
func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
var files = make([]File, len(fs))
for i := range fs {
files[i] = fileFromFileInfo(fs[i])
}
m.imut.Lock()
defer m.imut.Unlock()
if m.trace["net"] {
log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs))
}
repo := make(map[string]File)
for _, f := range fs {
for _, f := range files {
m.indexUpdate(repo, f)
}
m.rmut.Lock()
m.remote[nodeID] = repo
m.rmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForFiles(files)
}
// IndexUpdate is called for incremental updates to connected nodes' indexes.
// Implements the protocol.Model interface.
func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
if m.trace["net"] {
log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
var files = make([]File, len(fs))
for i := range fs {
files[i] = fileFromFileInfo(fs[i])
}
m.imut.Lock()
defer m.imut.Unlock()
if m.trace["net"] {
log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(files))
}
m.rmut.Lock()
repo, ok := m.remote[nodeID]
if !ok {
log.Printf("WARNING: Index update from node %s that does not have an index", nodeID)
m.rmut.Unlock()
return
}
for _, f := range fs {
for _, f := range files {
m.indexUpdate(repo, f)
}
m.rmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForFiles(files)
}
func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
func (m *Model) indexUpdate(repo map[string]File, f File) {
if m.trace["idx"] {
var flagComment string
if f.Flags&protocol.FlagDeleted != 0 {
@@ -298,14 +330,16 @@ func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
return
}
repo[f.Name] = fileFromFileInfo(f)
repo[f.Name] = f
}
// Close removes the peer from the model and closes the underlyign connection if possible.
// Close removes the peer from the model and closes the underlying connection if possible.
// Implements the protocol.Model interface.
func (m *Model) Close(node string, err error) {
m.Lock()
defer m.Unlock()
m.fq.RemoveAvailable(node)
m.pmut.Lock()
m.rmut.Lock()
conn, ok := m.rawConn[node]
if ok {
@@ -315,20 +349,26 @@ func (m *Model) Close(node string, err error) {
delete(m.remote, node)
delete(m.protoConn, node)
delete(m.rawConn, node)
m.fq.RemoveAvailable(node)
m.rmut.Unlock()
m.pmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForGlobal()
}
// Request returns the specified data segment by reading it from local disk.
// Implements the protocol.Model interface.
func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
// Verify that the requested file exists in the local and global model.
m.RLock()
m.lmut.RLock()
lf, localOk := m.local[name]
m.lmut.RUnlock()
m.gmut.RLock()
_, globalOk := m.global[name]
m.RUnlock()
m.gmut.RUnlock()
if !localOk || !globalOk {
log.Printf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
return nil, ErrNoSuchFile
@@ -363,35 +403,41 @@ func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []b
}
// ReplaceLocal replaces the local repository index with the given list of files.
// Change suppression is applied to files changing too often.
func (m *Model) ReplaceLocal(fs []File) {
m.Lock()
defer m.Unlock()
var updated bool
var newLocal = make(map[string]File)
m.lmut.RLock()
for _, f := range fs {
newLocal[f.Name] = f
if ef := m.local[f.Name]; !ef.Equals(f) {
updated = true
}
}
m.lmut.RUnlock()
if m.markDeletedLocals(newLocal) {
updated = true
}
m.lmut.RLock()
if len(newLocal) != len(m.local) {
updated = true
}
m.lmut.RUnlock()
if updated {
m.lmut.Lock()
m.local = newLocal
m.lmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForGlobal()
m.umut.Lock()
m.updatedLocal = time.Now().Unix()
m.lastIdxBcastRequest = time.Now()
m.umut.Unlock()
}
}
@@ -399,33 +445,25 @@ func (m *Model) ReplaceLocal(fs []File) {
// in protocol data types. Does not track deletes, should only be used to seed
// the local index from a cache file at startup.
func (m *Model) SeedLocal(fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
m.lmut.Lock()
m.local = make(map[string]File)
for _, f := range fs {
m.local[f.Name] = fileFromFileInfo(f)
}
m.lmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForGlobal()
}
// ConnectedTo returns true if we are connected to the named node.
func (m *Model) ConnectedTo(nodeID string) bool {
m.RLock()
defer m.RUnlock()
m.pmut.RLock()
_, ok := m.protoConn[nodeID]
m.pmut.RUnlock()
return ok
}
// ProtocolIndex returns the current local index in protocol data types.
func (m *Model) ProtocolIndex() []protocol.FileInfo {
m.RLock()
defer m.RUnlock()
return m.protocolIndex()
}
// RepoID returns a unique ID representing the current repository location.
func (m *Model) RepoID() string {
return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
@@ -436,79 +474,62 @@ func (m *Model) RepoID() string {
// repository changes.
func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
nodeID := protoConn.ID()
m.Lock()
m.pmut.Lock()
m.protoConn[nodeID] = protoConn
m.rawConn[nodeID] = rawConn
m.Unlock()
m.RLock()
idx := m.protocolIndex()
m.RUnlock()
m.pmut.Unlock()
go func() {
idx := m.ProtocolIndex()
protoConn.Index(idx)
}()
if m.rwRunning {
for i := 0; i < m.parallellRequests; i++ {
i := i
go func() {
if m.trace["pull"] {
log.Println("PULL: Starting", nodeID, i)
}
for {
m.RLock()
if _, ok := m.protoConn[nodeID]; !ok {
if m.trace["pull"] {
log.Println("PULL: Exiting", nodeID, i)
}
m.RUnlock()
return
}
m.RUnlock()
qb, ok := m.fq.Get(nodeID)
if ok {
if m.trace["pull"] {
log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
}
data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
m.fq.Done(qb.name, qb.block.Offset, data)
} else {
time.Sleep(1 * time.Second)
m.initmut.Lock()
rw := m.rwRunning
m.initmut.Unlock()
if !rw {
return
}
for i := 0; i < m.parallelRequests; i++ {
i := i
go func() {
if m.trace["pull"] {
log.Println("PULL: Starting", nodeID, i)
}
for {
m.pmut.RLock()
if _, ok := m.protoConn[nodeID]; !ok {
if m.trace["pull"] {
log.Println("PULL: Exiting", nodeID, i)
}
m.pmut.RUnlock()
return
}
}()
}
m.pmut.RUnlock()
qb, ok := m.fq.Get(nodeID)
if ok {
if m.trace["pull"] {
log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
}
data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
m.fq.Done(qb.name, qb.block.Offset, data)
} else {
time.Sleep(1 * time.Second)
}
}
}()
}
}
func (m *Model) shouldSuppressChange(name string) bool {
sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name])
if sup {
m.fileWasSuppressed[name]++
} else {
m.fileWasSuppressed[name] = 0
m.fileLastChanged[name] = time.Now()
}
return sup
}
func shouldSuppressChange(lastChange time.Time, numChanges int) bool {
sinceLast := time.Since(lastChange)
if sinceLast > maxFileHoldTimeS*time.Second {
return false
}
if sinceLast < time.Duration((numChanges+2)*minFileHoldTimeS)*time.Second {
return true
}
return false
}
// protocolIndex returns the current local index in protocol data types.
// ProtocolIndex returns the current local index in protocol data types.
// Must be called with the read lock held.
func (m *Model) protocolIndex() []protocol.FileInfo {
func (m *Model) ProtocolIndex() []protocol.FileInfo {
var index []protocol.FileInfo
m.lmut.RLock()
for _, f := range m.local {
mf := fileInfoFromFile(f)
if m.trace["idx"] {
@@ -520,13 +541,16 @@ func (m *Model) protocolIndex() []protocol.FileInfo {
}
index = append(index, mf)
}
m.lmut.RUnlock()
return index
}
func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
m.RLock()
m.pmut.RLock()
nc, ok := m.protoConn[nodeID]
m.RUnlock()
m.pmut.RUnlock()
if !ok {
return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
}
@@ -540,18 +564,23 @@ func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, ha
func (m *Model) broadcastIndexLoop() {
for {
m.RLock()
m.umut.RLock()
bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
m.RUnlock()
m.umut.RUnlock()
maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
m.Lock()
idx := m.ProtocolIndex()
var indexWg sync.WaitGroup
indexWg.Add(len(m.protoConn))
idx := m.protocolIndex()
m.umut.Lock()
m.lastIdxBcast = time.Now()
m.umut.Unlock()
m.pmut.RLock()
for _, node := range m.protoConn {
node := node
if m.trace["net"] {
@@ -562,7 +591,8 @@ func (m *Model) broadcastIndexLoop() {
indexWg.Done()
}()
}
m.Unlock()
m.pmut.RUnlock()
indexWg.Wait()
}
time.Sleep(idxBcastHoldtime)
@@ -570,13 +600,16 @@ func (m *Model) broadcastIndexLoop() {
}
// markDeletedLocals sets the deleted flag on files that have gone missing locally.
// Must be called with the write lock held.
func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
// For every file in the existing local table, check if they are also
// present in the new local table. If they are not, check that we already
// had the newest version available according to the global table and if so
// note the file as having been deleted.
var updated bool
m.gmut.RLock()
m.lmut.RLock()
for n, f := range m.local {
if _, ok := newLocal[n]; !ok {
if gf := m.global[n]; !gf.NewerThan(f) {
@@ -590,50 +623,91 @@ func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
}
}
}
m.lmut.RUnlock()
m.gmut.RUnlock()
return updated
}
func (m *Model) updateLocalLocked(f File) {
m.Lock()
m.updateLocal(f)
m.Unlock()
}
func (m *Model) updateLocal(f File) {
var updated bool
m.lmut.Lock()
if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) {
m.local[f.Name] = f
updated = true
}
m.lmut.Unlock()
if updated {
m.recomputeGlobal()
m.recomputeNeed()
// We don't recomputeNeed here for two reasons:
// - a need shouldn't have arisen due to having a newer local file
// - recomputeNeed might call into fq.Add but we might have been called by
// fq which would be a deadlock on fq
m.umut.Lock()
m.updatedLocal = time.Now().Unix()
m.lastIdxBcastRequest = time.Now()
m.umut.Unlock()
}
}
// Must be called with the write lock held.
/*
XXX: Not done, needs elegant handling of availability
func (m *Model) recomputeGlobalFor(files []File) bool {
m.gmut.Lock()
defer m.gmut.Unlock()
var updated bool
for _, f := range files {
if gf, ok := m.global[f.Name]; !ok || f.NewerThan(gf) {
m.global[f.Name] = f
updated = true
// Fix availability
}
}
return updated
}
*/
func (m *Model) recomputeGlobal() {
var newGlobal = make(map[string]File)
m.lmut.RLock()
for n, f := range m.local {
newGlobal[n] = f
}
m.lmut.RUnlock()
var available = make(map[string][]string)
m.rmut.RLock()
var highestMod int64
for nodeID, fs := range m.remote {
for n, nf := range fs {
if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
newGlobal[n] = nf
m.fq.SetAvailable(n, nodeID)
available[n] = []string{nodeID}
if nf.Modified > highestMod {
highestMod = nf.Modified
}
} else if lf.Equals(nf) {
m.fq.AddAvailable(n, nodeID)
available[n] = append(available[n], nodeID)
}
}
}
m.rmut.RUnlock()
for f, ns := range available {
m.fq.SetAvailable(f, ns)
}
// Figure out if anything actually changed
m.gmut.RLock()
var updated bool
if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
updated = true
@@ -645,64 +719,110 @@ func (m *Model) recomputeGlobal() {
}
}
}
m.gmut.RUnlock()
if updated {
m.updateGlobal = time.Now().Unix()
m.gmut.Lock()
m.umut.Lock()
m.global = newGlobal
m.updateGlobal = time.Now().Unix()
m.umut.Unlock()
m.gmut.Unlock()
}
}
// Must be called with the write lock held.
func (m *Model) recomputeNeed() {
for n, gf := range m.global {
if m.fq.Queued(n) {
continue
}
lf, ok := m.local[n]
if !ok || gf.NewerThan(lf) {
if gf.Flags&protocol.FlagInvalid != 0 {
// Never attempt to sync invalid files
continue
}
if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
// Don't want to delete files, so forget this need
continue
}
if gf.Flags&protocol.FlagDeleted != 0 && !ok {
// Don't have the file, so don't need to delete it
continue
}
if m.trace["need"] {
log.Printf("NEED: lf:%v gf:%v", lf, gf)
}
type addOrder struct {
n string
remote []Block
fm *fileMonitor
}
if gf.Flags&protocol.FlagDeleted != 0 {
m.dq <- gf
} else {
local, remote := BlockDiff(lf.Blocks, gf.Blocks)
fm := fileMonitor{
name: n,
path: path.Clean(path.Join(m.dir, n)),
global: gf,
model: m,
localBlocks: local,
}
m.fq.Add(n, remote, &fm)
func (m *Model) recomputeNeedForGlobal() {
var toDelete []File
var toAdd []addOrder
m.gmut.RLock()
for _, gf := range m.global {
toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
}
m.gmut.RUnlock()
for _, ao := range toAdd {
m.fq.Add(ao.n, ao.remote, ao.fm)
}
for _, gf := range toDelete {
m.dq <- gf
}
}
func (m *Model) recomputeNeedForFiles(files []File) {
var toDelete []File
var toAdd []addOrder
m.gmut.RLock()
for _, gf := range files {
toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
}
m.gmut.RUnlock()
for _, ao := range toAdd {
m.fq.Add(ao.n, ao.remote, ao.fm)
}
for _, gf := range toDelete {
m.dq <- gf
}
}
func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) ([]addOrder, []File) {
m.lmut.RLock()
lf, ok := m.local[gf.Name]
m.lmut.RUnlock()
if !ok || gf.NewerThan(lf) {
if gf.Flags&protocol.FlagInvalid != 0 {
// Never attempt to sync invalid files
return toAdd, toDelete
}
if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
// Don't want to delete files, so forget this need
return toAdd, toDelete
}
if gf.Flags&protocol.FlagDeleted != 0 && !ok {
// Don't have the file, so don't need to delete it
return toAdd, toDelete
}
if m.trace["need"] {
log.Printf("NEED: lf:%v gf:%v", lf, gf)
}
if gf.Flags&protocol.FlagDeleted != 0 {
toDelete = append(toDelete, gf)
} else {
local, remote := BlockDiff(lf.Blocks, gf.Blocks)
fm := fileMonitor{
name: gf.Name,
path: path.Clean(path.Join(m.dir, gf.Name)),
global: gf,
model: m,
localBlocks: local,
}
toAdd = append(toAdd, addOrder{gf.Name, remote, &fm})
}
}
return toAdd, toDelete
}
func (m *Model) WhoHas(name string) []string {
m.RLock()
defer m.RUnlock()
return m.whoHas(name)
}
// Must be called with the read lock held.
func (m *Model) whoHas(name string) []string {
var remote []string
m.gmut.RLock()
m.rmut.RLock()
gf := m.global[name]
for node, files := range m.remote {
if file, ok := files[name]; ok && file.Equals(gf) {
@@ -710,10 +830,12 @@ func (m *Model) whoHas(name string) []string {
}
}
m.rmut.RUnlock()
m.gmut.RUnlock()
return remote
}
func (m *Model) deleteFiles() {
func (m *Model) deleteLoop() {
for file := range m.dq {
if m.trace["file"] {
log.Println("FILE: Delete", file.Name)
@@ -723,7 +845,8 @@ func (m *Model) deleteFiles() {
if err != nil {
log.Printf("WARNING: %s: %v", file.Name, err)
}
m.updateLocalLocked(file)
m.updateLocal(file)
}
}

View File

@@ -12,7 +12,7 @@ import (
)
func TestNewModel(t *testing.T) {
m := NewModel("foo")
m := NewModel("foo", 1e6)
if m == nil {
t.Fatalf("NewModel returned nil")
@@ -34,6 +34,12 @@ var testDataExpected = map[string]File{
Modified: 0,
Blocks: []Block{{Offset: 0x0, Size: 0x7, Hash: []uint8{0xae, 0xc0, 0x70, 0x64, 0x5f, 0xe5, 0x3e, 0xe3, 0xb3, 0x76, 0x30, 0x59, 0x37, 0x61, 0x34, 0xf0, 0x58, 0xcc, 0x33, 0x72, 0x47, 0xc9, 0x78, 0xad, 0xd1, 0x78, 0xb6, 0xcc, 0xdf, 0xb0, 0x1, 0x9f}}},
},
"empty": File{
Name: "empty",
Flags: 0,
Modified: 0,
Blocks: []Block{{Offset: 0x0, Size: 0x0, Hash: []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}}},
},
"bar": File{
Name: "bar",
Flags: 0,
@@ -53,7 +59,7 @@ func init() {
}
func TestUpdateLocal(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -95,7 +101,7 @@ func TestUpdateLocal(t *testing.T) {
}
func TestRemoteUpdateExisting(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -112,7 +118,7 @@ func TestRemoteUpdateExisting(t *testing.T) {
}
func TestRemoteAddNew(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -129,7 +135,7 @@ func TestRemoteAddNew(t *testing.T) {
}
func TestRemoteUpdateOld(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -147,7 +153,7 @@ func TestRemoteUpdateOld(t *testing.T) {
}
func TestRemoteIndexUpdate(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -180,7 +186,7 @@ func TestRemoteIndexUpdate(t *testing.T) {
}
func TestDelete(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -282,7 +288,7 @@ func TestDelete(t *testing.T) {
}
func TestForgetNode(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -335,7 +341,7 @@ func TestForgetNode(t *testing.T) {
}
func TestRequest(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -356,33 +362,8 @@ func TestRequest(t *testing.T) {
}
}
func TestSuppression(t *testing.T) {
var testdata = []struct {
lastChange time.Time
hold int
result bool
}{
{time.Unix(0, 0), 0, false}, // First change
{time.Now().Add(-1 * time.Second), 0, true}, // Changed once one second ago, suppress
{time.Now().Add(-119 * time.Second), 0, true}, // Changed once 119 seconds ago, suppress
{time.Now().Add(-121 * time.Second), 0, false}, // Changed once 121 seconds ago, permit
{time.Now().Add(-179 * time.Second), 1, true}, // Suppressed once 179 seconds ago, suppress again
{time.Now().Add(-181 * time.Second), 1, false}, // Suppressed once 181 seconds ago, permit
{time.Now().Add(-599 * time.Second), 99, true}, // Suppressed lots of times, last allowed 599 seconds ago, suppress again
{time.Now().Add(-601 * time.Second), 99, false}, // Suppressed lots of times, last allowed 601 seconds ago, permit
}
for i, tc := range testdata {
if shouldSuppressChange(tc.lastChange, tc.hold) != tc.result {
t.Errorf("Incorrect result for test #%d: %v", i, tc)
}
}
}
func TestIgnoreWithUnknownFlags(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -410,10 +391,7 @@ func TestIgnoreWithUnknownFlags(t *testing.T) {
}
}
func prepareModel(n int, m *Model) []protocol.FileInfo {
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
func genFiles(n int) []protocol.FileInfo {
files := make([]protocol.FileInfo, n)
t := time.Now().Unix()
for i := 0; i < n; i++ {
@@ -424,33 +402,39 @@ func prepareModel(n int, m *Model) []protocol.FileInfo {
}
}
m.Index("42", files)
return files
}
func BenchmarkRecomputeGlobal10k(b *testing.B) {
m := NewModel("testdata")
prepareModel(10000, m)
func BenchmarkIndex10000(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.recomputeGlobal()
m.Index("42", files)
}
}
func BenchmarkRecomputeNeed10K(b *testing.B) {
m := NewModel("testdata")
prepareModel(10000, m)
func BenchmarkIndex00100(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.recomputeNeed()
m.Index("42", files)
}
}
func BenchmarkIndexUpdate10000(b *testing.B) {
m := NewModel("testdata")
files := prepareModel(10000, m)
func BenchmarkIndexUpdate10000f10000(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
m.Index("42", files)
b.ResetTimer()
for i := 0; i < b.N; i++ {
@@ -458,6 +442,34 @@ func BenchmarkIndexUpdate10000(b *testing.B) {
}
}
func BenchmarkIndexUpdate10000f00100(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
m.Index("42", files)
ufiles := genFiles(100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate("42", ufiles)
}
}
func BenchmarkIndexUpdate10000f00001(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
m.Index("42", files)
ufiles := genFiles(1)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate("42", ufiles)
}
}
type FakeConnection struct {
id string
requestData []byte
@@ -471,6 +483,10 @@ func (f FakeConnection) ID() string {
return string(f.id)
}
func (f FakeConnection) Option(string) string {
return ""
}
func (FakeConnection) Index([]protocol.FileInfo) {}
func (f FakeConnection) Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) {
@@ -486,7 +502,7 @@ func (FakeConnection) Statistics() protocol.Statistics {
}
func BenchmarkRequest(b *testing.B) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)

72
model/suppressor.go Normal file
View File

@@ -0,0 +1,72 @@
package model
import (
"sync"
"time"
)
const (
MAX_CHANGE_HISTORY = 4
)
type change struct {
size int64
when time.Time
}
type changeHistory struct {
changes []change
next int64
prevSup bool
}
type suppressor struct {
sync.Mutex
changes map[string]changeHistory
threshold int64 // bytes/s
}
func (h changeHistory) bandwidth(t time.Time) int64 {
if len(h.changes) == 0 {
return 0
}
var t0 = h.changes[0].when
if t == t0 {
return 0
}
var bw float64
for _, c := range h.changes {
bw += float64(c.size)
}
return int64(bw / t.Sub(t0).Seconds())
}
func (h *changeHistory) append(size int64, t time.Time) {
c := change{size, t}
if len(h.changes) == MAX_CHANGE_HISTORY {
h.changes = h.changes[1:MAX_CHANGE_HISTORY]
}
h.changes = append(h.changes, c)
}
func (s *suppressor) suppress(name string, size int64, t time.Time) (bool, bool) {
s.Lock()
if s.changes == nil {
s.changes = make(map[string]changeHistory)
}
h := s.changes[name]
sup := h.bandwidth(t) > s.threshold
prevSup := h.prevSup
h.prevSup = sup
if !sup {
h.append(size, t)
}
s.changes[name] = h
s.Unlock()
return sup, prevSup
}

113
model/suppressor_test.go Normal file
View File

@@ -0,0 +1,113 @@
package model
import (
"testing"
"time"
)
func TestSuppressor(t *testing.T) {
s := suppressor{threshold: 10000}
t0 := time.Now()
t1 := t0
sup, prev := s.suppress("foo", 10000, t1)
if sup {
t.Fatal("Never suppress first change")
}
if prev {
t.Fatal("Incorrect prev status")
}
// bw is 10000 / 10 = 1000
t1 = t0.Add(10 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 1000 {
t.Error("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 10000, t1)
if sup {
t.Fatal("Should still be fine")
}
if prev {
t.Fatal("Incorrect prev status")
}
// bw is (10000 + 10000) / 11 = 1818
t1 = t0.Add(11 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 1818 {
t.Error("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 100500, t1)
if sup {
t.Fatal("Should still be fine")
}
if prev {
t.Fatal("Incorrect prev status")
}
// bw is (10000 + 10000 + 100500) / 12 = 10041
t1 = t0.Add(12 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 10041 {
t.Error("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 10000000, t1) // value will be ignored
if !sup {
t.Fatal("Should be over threshold")
}
if prev {
t.Fatal("Incorrect prev status")
}
// bw is (10000 + 10000 + 100500) / 15 = 8033
t1 = t0.Add(15 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 8033 {
t.Error("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 10000000, t1)
if sup {
t.Fatal("Should be Ok")
}
if !prev {
t.Fatal("Incorrect prev status")
}
}
func TestHistory(t *testing.T) {
h := changeHistory{}
t0 := time.Now()
h.append(40, t0)
if l := len(h.changes); l != 1 {
t.Errorf("Incorrect history length %d", l)
}
if s := h.changes[0].size; s != 40 {
t.Errorf("Incorrect first record size %d", s)
}
for i := 1; i < MAX_CHANGE_HISTORY; i++ {
h.append(int64(40+i), t0.Add(time.Duration(i)*time.Second))
}
if l := len(h.changes); l != MAX_CHANGE_HISTORY {
t.Errorf("Incorrect history length %d", l)
}
if s := h.changes[0].size; s != 40 {
t.Errorf("Incorrect first record size %d", s)
}
if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 40+MAX_CHANGE_HISTORY-1 {
t.Errorf("Incorrect last record size %d", s)
}
h.append(999, t0.Add(time.Duration(999)*time.Second))
if l := len(h.changes); l != MAX_CHANGE_HISTORY {
t.Errorf("Incorrect history length %d", l)
}
if s := h.changes[0].size; s != 41 {
t.Errorf("Incorrect first record size %d", s)
}
if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 999 {
t.Errorf("Incorrect last record size %d", s)
}
}

0
model/testdata/empty vendored Normal file
View File

View File

@@ -32,7 +32,7 @@ func (f File) Size() (bytes int) {
}
func (f File) String() string {
return fmt.Sprintf("File{Name:%q, Flags:0x%x, Modified:%d, Version:%d:, NumBlocks:%d}",
return fmt.Sprintf("File{Name:%q, Flags:0x%x, Modified:%d, Version:%d, NumBlocks:%d}",
f.Name, f.Flags, f.Modified, f.Version, len(f.Blocks))
}
@@ -85,6 +85,9 @@ func (m *Model) loadIgnoreFiles(ign map[string][]string) filepath.WalkFunc {
func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.WalkFunc {
return func(p string, info os.FileInfo, err error) error {
if err != nil {
if m.trace["file"] {
log.Printf("FILE: %q: %v", p, err)
}
return nil
}
@@ -110,38 +113,36 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.
}
if info.Mode()&os.ModeType == 0 {
fi, err := os.Stat(p)
if err != nil {
return nil
}
modified := fi.ModTime().Unix()
modified := info.ModTime().Unix()
m.RLock()
hf, ok := m.local[rn]
m.RUnlock()
m.lmut.RLock()
lf, ok := m.local[rn]
m.lmut.RUnlock()
if ok && hf.Modified == modified {
if nf := uint32(info.Mode()); nf != hf.Flags {
hf.Flags = nf
hf.Version++
if ok && lf.Modified == modified {
if nf := uint32(info.Mode()); nf != lf.Flags {
lf.Flags = nf
lf.Version++
}
*res = append(*res, hf)
*res = append(*res, lf)
} else {
m.Lock()
if m.shouldSuppressChange(rn) {
if cur, prev := m.sup.suppress(rn, info.Size(), time.Now()); cur {
if m.trace["file"] {
log.Println("FILE: SUPPRESS:", rn, m.fileWasSuppressed[rn], time.Since(m.fileLastChanged[rn]))
log.Printf("FILE: SUPPRESS: %q change bw over threshold", rn)
}
if !prev {
log.Printf("INFO: Changes to %q are being temporarily suppressed because it changes too frequently.", rn)
}
if ok {
hf.Flags = protocol.FlagInvalid
hf.Version++
*res = append(*res, hf)
lf.Flags = protocol.FlagInvalid
lf.Version++
*res = append(*res, lf)
}
m.Unlock()
return nil
} else if prev && !cur {
log.Printf("INFO: Changes to %q are no longer suppressed.", rn)
}
m.Unlock()
if m.trace["file"] {
log.Printf("FILE: Hash %q", p)
@@ -198,9 +199,9 @@ func (m *Model) Walk(followSymlinks bool) (files []File, ignore map[string][]str
return
}
for _, fi := range fis {
if fi.Mode()&os.ModeSymlink != 0 {
dir := path.Join(m.dir, fi.Name()) + "/"
for _, info := range fis {
if info.Mode()&os.ModeSymlink != 0 {
dir := path.Join(m.dir, info.Name()) + "/"
filepath.Walk(dir, m.loadIgnoreFiles(ignore))
filepath.Walk(dir, hashFiles)
}

View File

@@ -13,6 +13,7 @@ var testdata = []struct {
hash string
}{
{"bar", 10, "2f72cc11a6fcd0271ecef8c61056ee1eb1243be3805bf9a9df98f92f7636b05c"},
{"empty", 0, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"},
{"foo", 7, "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f"},
}
@@ -21,7 +22,7 @@ var correctIgnores = map[string][]string{
}
func TestWalk(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
files, ignores := m.Walk(false)
if l1, l2 := len(files), len(testdata); l1 != l2 {

View File

@@ -193,6 +193,33 @@ model, the Index Update merely amends it with new or updated file
information. Any files not mentioned in an Index Update are left
unchanged.
### Options (Type = 7)
This informational message provides information about the client
configuration, version, etc. It is sent at connection initiation and,
optionally, when any of the sent parameters have changed. The message is
in the form of a list of (key, value) pairs, both of string type.
struct OptionsMessage {
KeyValue Options<>;
}
struct KeyValue {
string Key;
string Value;
}
Key ID:s apart from the well known ones are implementation
specific. An implementation is expected to ignore unknown keys. An
implementation may impose limits on key and value size.
Well known keys:
- "clientId" -- The name of the implementation. Example: "syncthing".
- "clientVersion" -- The version of the client. Example: "v1.0.33-47". The
Following the SemVer 2.0 specification for version strings is
encouraged but not enforced.
Example Exchange
----------------

View File

@@ -65,6 +65,14 @@ func (w *marshalWriter) writeResponse(data []byte) {
w.writeBytes(data)
}
func (w *marshalWriter) writeOptions(opts map[string]string) {
w.writeUint32(uint32(len(opts)))
for k, v := range opts {
w.writeString(k)
w.writeString(v)
}
}
func (r *marshalReader) readHeader() header {
return decodeHeader(r.readUint32())
}
@@ -109,3 +117,14 @@ func (r *marshalReader) readRequest() request {
func (r *marshalReader) readResponse() []byte {
return r.readBytes()
}
func (r *marshalReader) readOptions() map[string]string {
n := r.readUint32()
opts := make(map[string]string, n)
for i := 0; i < int(n); i++ {
k := r.readString()
v := r.readString()
opts[k] = v
}
return opts
}

View File

@@ -117,3 +117,23 @@ func BenchmarkWriteRequest(b *testing.B) {
wr.writeRequest(req)
}
}
func TestOptions(t *testing.T) {
opts := map[string]string{
"foo": "bar",
"someKey": "otherValue",
"hello": "",
"": "42",
}
var buf = new(bytes.Buffer)
var wr = marshalWriter{w: buf}
wr.writeOptions(opts)
var rd = marshalReader{r: buf}
var ropts = rd.readOptions()
if !reflect.DeepEqual(opts, ropts) {
t.Error("Incorrect options marshal/demarshal")
}
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"log"
"sync"
"time"
@@ -18,6 +19,7 @@ const (
messageTypePing = 4
messageTypePong = 5
messageTypeIndexUpdate = 6
messageTypeOptions = 7
)
const (
@@ -52,16 +54,18 @@ type Model interface {
type Connection struct {
sync.RWMutex
id string
receiver Model
reader io.Reader
mreader *marshalReader
writer io.Writer
mwriter *marshalWriter
closed bool
awaiting map[int]chan asyncResult
nextId int
indexSent map[string][2]int64
id string
receiver Model
reader io.Reader
mreader *marshalReader
writer io.Writer
mwriter *marshalWriter
closed bool
awaiting map[int]chan asyncResult
nextId int
indexSent map[string][2]int64
options map[string]string
optionsLock sync.Mutex
hasSentIndex bool
hasRecvdIndex bool
@@ -81,7 +85,7 @@ const (
pingIdleTime = 5 * time.Minute
)
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection {
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model, options map[string]string) *Connection {
flrd := flate.NewReader(reader)
flwr, err := flate.NewWriter(writer, flate.BestSpeed)
if err != nil {
@@ -101,6 +105,20 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
go c.readerLoop()
go c.pingerLoop()
if options != nil {
go func() {
c.Lock()
c.mwriter.writeHeader(header{0, c.nextId, messageTypeOptions})
c.mwriter.writeOptions(options)
err := c.flush()
if err != nil {
log.Println("Warning: Write error during initial handshake:", err)
}
c.nextId++
c.Unlock()
}()
}
return &c
}
@@ -328,6 +346,11 @@ loop:
c.Unlock()
}
case messageTypeOptions:
c.optionsLock.Lock()
c.options = c.mreader.readOptions()
c.optionsLock.Unlock()
default:
c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
break loop
@@ -341,14 +364,15 @@ func (c *Connection) processRequest(msgID int, req request) {
c.Lock()
c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
c.mwriter.writeResponse(data)
err := c.flush()
err := c.mwriter.err
if err == nil {
err = c.flush()
}
c.Unlock()
buffers.Put(data)
if err != nil {
c.close(err)
} else if c.mwriter.err != nil {
c.close(c.mwriter.err)
}
}
@@ -395,3 +419,9 @@ func (c *Connection) Statistics() Statistics {
return stats
}
func (c *Connection) Option(key string) string {
c.optionsLock.Lock()
defer c.optionsLock.Unlock()
return c.options[key]
}

View File

@@ -43,8 +43,8 @@ func TestPing(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, nil)
c1 := NewConnection("c1", br, aw, nil)
c0 := NewConnection("c0", ar, bw, nil, nil)
c1 := NewConnection("c1", br, aw, nil, nil)
if ok := c0.ping(); !ok {
t.Error("c0 ping failed")
@@ -67,8 +67,8 @@ func TestPingErr(t *testing.T) {
eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e}
ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
c0 := NewConnection("c0", ar, ebw, m0)
NewConnection("c1", br, eaw, m1)
c0 := NewConnection("c0", ar, ebw, m0, nil)
NewConnection("c1", br, eaw, m1, nil)
res := c0.ping()
if (i < 4 || j < 4) && res {
@@ -94,8 +94,8 @@ func TestRequestResponseErr(t *testing.T) {
eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e}
ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
NewConnection("c0", ar, ebw, m0)
c1 := NewConnection("c1", br, eaw, m1)
NewConnection("c0", ar, ebw, m0, nil)
c1 := NewConnection("c1", br, eaw, m1, nil)
d, err := c1.Request("tn", 1234, 3456, []byte("hashbytes"))
if err == e || err == ErrClosed {
@@ -143,8 +143,8 @@ func TestVersionErr(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0)
NewConnection("c1", br, aw, m1)
c0 := NewConnection("c0", ar, bw, m0, nil)
NewConnection("c1", br, aw, m1, nil)
c0.mwriter.writeHeader(header{
version: 2,
@@ -165,8 +165,8 @@ func TestTypeErr(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0)
NewConnection("c1", br, aw, m1)
c0 := NewConnection("c0", ar, bw, m0, nil)
NewConnection("c1", br, aw, m1, nil)
c0.mwriter.writeHeader(header{
version: 0,
@@ -187,8 +187,8 @@ func TestClose(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0)
NewConnection("c1", br, aw, m1)
c0 := NewConnection("c0", ar, bw, m0, nil)
NewConnection("c1", br, aw, m1, nil)
c0.close(nil)

52
usage.go Normal file
View File

@@ -0,0 +1,52 @@
package main
import (
"bytes"
"flag"
"fmt"
"io"
"text/tabwriter"
)
func optionTable(w io.Writer, rows [][]string) {
tw := tabwriter.NewWriter(w, 2, 4, 2, ' ', 0)
for _, row := range rows {
for i, cell := range row {
if i > 0 {
tw.Write([]byte("\t"))
}
tw.Write([]byte(cell))
}
tw.Write([]byte("\n"))
}
tw.Flush()
}
func usageFor(fs *flag.FlagSet, usage string) func() {
return func() {
var b bytes.Buffer
b.WriteString("Usage:\n " + usage + "\n")
var options [][]string
fs.VisitAll(func(f *flag.Flag) {
var dash = "-"
if len(f.Name) > 1 {
dash = "--"
}
var opt = " " + dash + f.Name
if f.DefValue != "false" {
opt += "=" + f.DefValue
}
options = append(options, []string{opt, f.Usage})
})
if len(options) > 0 {
b.WriteString("\nOptions:\n")
optionTable(&b, options)
}
fmt.Println(b.String())
}
}