Compare commits

...

13 Commits

Author SHA1 Message Date
Jakob Borg
5980952495 Actually load index cache again (fixes #45) 2014-01-29 22:02:38 +01:00
Jakob Borg
618c376e18 Synchronize zero sized files (fixes #44) 2014-01-29 21:52:27 +01:00
Jakob Borg
d31a126408 CONTRIBUTING.md 2014-01-28 19:10:39 +01:00
Jakob Borg
6d3f8a2c06 Parallell -> parallel (ref #13) 2014-01-26 16:48:20 +01:00
Jakob Borg
b1ba976122 Move auto generated source to a package 2014-01-26 15:02:06 +01:00
Jakob Borg
81d5d1d4a6 Rework config/flags (fixes #13) 2014-01-26 14:45:03 +01:00
Jakob Borg
ea5ef28c5a Performance: improve need computation 2014-01-23 22:20:15 +01:00
Jakob Borg
fc2ebc6cad Performance: make filequeue not suck 2014-01-23 16:39:12 +01:00
Jakob Borg
01096fff6c Add version info to GUI (fixes #41) 2014-01-23 13:13:15 +01:00
Jakob Borg
2ea3558283 Add Options message to protocol 2014-01-23 13:12:45 +01:00
Jakob Borg
20a47695fb Create syncthing.ini template (fixes #39) 2014-01-22 14:28:14 +01:00
Jakob Borg
1dde9ec2d8 New file change suppression algorithm (fixes #30) 2014-01-22 12:52:27 +01:00
Jakob Borg
0841a46055 Don't crash on invalid options 2014-01-22 12:52:15 +01:00
26 changed files with 876 additions and 326 deletions

22
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,22 @@
Please do contribute!
## Building
[See the wiki](https://github.com/calmh/syncthing/wiki/Building)
## Tests
Yes please!
## Style
`go fmt`
## Documentation
[Hack it here](https://github.com/calmh/syncthing/wiki)
## License
MIT

5
auto/gui.files.go Normal file
View File

File diff suppressed because one or more lines are too long

View File

@@ -11,7 +11,8 @@ fi
if [[ -z $1 ]] ; then
go build -ldflags "-X main.Version $version"
elif [[ $1 == "embed" ]] ; then
embedder main gui > gui.files.go
embedder auto gui > auto/gui.files.go \
&& go build -ldflags "-X main.Version $version"
elif [[ $1 == "tar" ]] ; then
go build -ldflags "-X main.Version $version" \
&& mkdir syncthing-dist \

129
config.go Normal file
View File

@@ -0,0 +1,129 @@
package main
import (
"fmt"
"io"
"reflect"
"strconv"
"strings"
"text/template"
"time"
)
type Options struct {
Listen string `ini:"listen-address" default:":22000" description:"ip:port to for incoming sync connections"`
ReadOnly bool `ini:"read-only" description:"Allow changes to the local repository"`
Delete bool `ini:"allow-delete" default:"true" description:"Allow deletes of files in the local repository"`
Symlinks bool `ini:"follow-symlinks" default:"true" description:"Follow symbolic links at the top level of the repository"`
GUI bool `ini:"gui-enabled" default:"true" description:"Enable the HTTP GUI"`
GUIAddr string `ini:"gui-address" default:"127.0.0.1:8080" description:"ip:port for GUI connections"`
ExternalServer string `ini:"global-announce-server" default:"syncthing.nym.se:22025" description:"Global server for announcements"`
ExternalDiscovery bool `ini:"global-announce-enabled" default:"true" description:"Announce to the global announce server"`
LocalDiscovery bool `ini:"local-announce-enabled" default:"true" description:"Announce to the local network"`
ParallelRequests int `ini:"parallel-requests" default:"16" description:"Maximum number of blocks to request in parallel"`
LimitRate int `ini:"max-send-kbps" description:"Limit outgoing data rate (kbyte/s)"`
ScanInterval time.Duration `ini:"rescan-interval" default:"60s" description:"Scan repository for changes this often"`
ConnInterval time.Duration `ini:"reconnection-interval" default:"60s" description:"Attempt to (re)connect to peers this often"`
MaxChangeBW int `ini:"max-change-bw" default:"1000" description:"Suppress files changing more than this (kbyte/s)"`
}
func loadConfig(m map[string]string, data interface{}) error {
s := reflect.ValueOf(data).Elem()
t := s.Type()
for i := 0; i < s.NumField(); i++ {
f := s.Field(i)
tag := t.Field(i).Tag
name := tag.Get("ini")
if len(name) == 0 {
name = strings.ToLower(t.Field(i).Name)
}
v, ok := m[name]
if !ok {
v = tag.Get("default")
}
if len(v) > 0 {
switch f.Interface().(type) {
case time.Duration:
d, err := time.ParseDuration(v)
if err != nil {
return err
}
f.SetInt(int64(d))
case string:
f.SetString(v)
case int:
i, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return err
}
f.SetInt(i)
case bool:
f.SetBool(v == "true")
default:
panic(f.Type())
}
}
}
return nil
}
type cfg struct {
Key string
Value string
Comment string
}
func structToValues(data interface{}) []cfg {
s := reflect.ValueOf(data).Elem()
t := s.Type()
var vals []cfg
for i := 0; i < s.NumField(); i++ {
f := s.Field(i)
tag := t.Field(i).Tag
var c cfg
c.Key = tag.Get("ini")
if len(c.Key) == 0 {
c.Key = strings.ToLower(t.Field(i).Name)
}
c.Value = fmt.Sprint(f.Interface())
c.Comment = tag.Get("description")
vals = append(vals, c)
}
return vals
}
var configTemplateStr = `[repository]
{{if .comments}}; The directory to synchronize. Will be created if it does not exist.
{{end}}dir = {{.dir}}
[nodes]
{{if .comments}}; Map of node ID to addresses, or "dynamic" for automatic discovery. Examples:
; J3MZ4G5O4CLHJKB25WX47K5NUJUWDOLO2TTNY3TV3NRU4HVQRKEQ = 172.16.32.24:22000
; ZNJZRXQKYHF56A2VVNESRZ6AY4ZOWGFJCV6FXDZJUTRVR3SNBT6Q = dynamic
{{end}}{{range $n, $a := .nodes}}{{$n}} = {{$a}}
{{end}}
[settings]
{{range $v := .settings}}; {{$v.Comment}}
{{$v.Key}} = {{$v.Value}}
{{end}}
`
var configTemplate = template.Must(template.New("config").Parse(configTemplateStr))
func writeConfig(wr io.Writer, dir string, nodes map[string]string, opts Options, comments bool) {
configTemplate.Execute(wr, map[string]interface{}{
"dir": dir,
"nodes": nodes,
"settings": structToValues(&opts),
"comments": comments,
})
}

View File

@@ -100,7 +100,6 @@ type Discoverer struct {
MyID string
ListenPort int
BroadcastIntv time.Duration
ExtListenPort int
ExtBroadcastIntv time.Duration
conn *net.UDPConn
@@ -114,7 +113,7 @@ type Discoverer struct {
// When we hit this many errors in succession, we stop.
const maxErrors = 30
func NewDiscoverer(id string, port int, extPort int, extServer string) (*Discoverer, error) {
func NewDiscoverer(id string, port int, extServer string) (*Discoverer, error) {
local4 := &net.UDPAddr{IP: net.IP{0, 0, 0, 0}, Port: AnnouncementPort}
conn, err := net.ListenUDP("udp4", local4)
if err != nil {
@@ -125,7 +124,6 @@ func NewDiscoverer(id string, port int, extPort int, extServer string) (*Discove
MyID: id,
ListenPort: port,
BroadcastIntv: 30 * time.Second,
ExtListenPort: extPort,
ExtBroadcastIntv: 1800 * time.Second,
conn: conn,
@@ -138,7 +136,7 @@ func NewDiscoverer(id string, port int, extPort int, extServer string) (*Discove
if disc.ListenPort > 0 {
disc.sendAnnouncements()
}
if len(disc.extServer) > 0 && disc.ExtListenPort > 0 {
if len(disc.extServer) > 0 {
disc.sendExtAnnouncements()
}
@@ -153,13 +151,13 @@ func (d *Discoverer) sendAnnouncements() {
}
func (d *Discoverer) sendExtAnnouncements() {
extIP, err := net.ResolveUDPAddr("udp", d.extServer+":22025")
extIP, err := net.ResolveUDPAddr("udp", d.extServer)
if err != nil {
log.Printf("discover/external: %v; no external announcements", err)
return
}
buf := EncodePacket(Packet{AnnouncementMagic, uint16(d.ExtListenPort), d.MyID, nil})
buf := EncodePacket(Packet{AnnouncementMagic, uint16(22000), d.MyID, nil})
go d.writeAnnouncements(buf, extIP, d.ExtBroadcastIntv)
}
@@ -213,7 +211,7 @@ func (d *Discoverer) recvAnnouncements() {
}
func (d *Discoverer) externalLookup(node string) (string, bool) {
extIP, err := net.ResolveUDPAddr("udp", d.extServer+":22025")
extIP, err := net.ResolveUDPAddr("udp", d.extServer)
if err != nil {
log.Printf("discover/external: %v; no external lookup", err)
return "", false

View File

File diff suppressed because one or more lines are too long

3
gui.go
View File

@@ -11,6 +11,7 @@ import (
"sync"
"time"
"github.com/calmh/syncthing/auto"
"github.com/calmh/syncthing/model"
"github.com/codegangsta/martini"
"github.com/cratonica/embed"
@@ -26,7 +27,7 @@ func startGUI(addr string, m *model.Model) {
router.Get("/rest/need", restGetNeed)
router.Get("/rest/system", restGetSystem)
fs, err := embed.Unpack(Resources)
fs, err := embed.Unpack(auto.Resources)
if err != nil {
panic(err)
}

View File

@@ -100,7 +100,11 @@ html, body {
<table class="table table-condensed">
<tbody>
<tr ng-repeat="(node, address) in config.nodes" ng-class="{'text-primary': !!connections[node], 'text-muted': node == config.myID}">
<td><abbr class="text-monospace" title="{{node}}">{{node | short}}</abbr></td>
<td><span class="text-monospace">{{node | short}}</span></td>
<td>
<span ng-show="node != config.myID">{{connections[node].ClientVersion}}</span>
<span ng-show="node == config.myID">{{version}}</span>
</td>
<td>
<span ng-show="node == config.myID">
<span class="glyphicon glyphicon-ok"></span>

View File

@@ -9,11 +9,11 @@ go build genfiles.go
go build md5r.go
echo "Setting up (keys)..."
i1=$(syncthing -c conf-1 2>&1 | awk '/My ID/ {print $6}')
i1=$(syncthing --home conf-1 2>&1 | awk '/My ID/ {print $7}')
echo $i1
i2=$(syncthing -c conf-2 2>&1 | awk '/My ID/ {print $6}')
i2=$(syncthing --home conf-2 2>&1 | awk '/My ID/ {print $7}')
echo $i2
i3=$(syncthing -c conf-3 2>&1 | awk '/My ID/ {print $6}')
i3=$(syncthing --home conf-3 2>&1 | awk '/My ID/ {print $7}')
echo $i3
echo "Setting up (files)..."
@@ -26,11 +26,16 @@ dir = $p/files-$i
$i1 = 127.0.0.1:22001
$i2 = 127.0.0.1:22002
$i3 = 127.0.0.1:22003
[settings]
gui-enabled = false
listen-address = :2200$i
EOT
mkdir files-$i
pushd files-$i >/dev/null
../genfiles -maxexp 21 -files 4000
../genfiles -maxexp 21 -files 400
touch empty-$i
../md5r > ../md5-$i
popd >/dev/null
done
@@ -38,7 +43,7 @@ done
echo "Starting..."
for i in 1 2 3 ; do
sleep 1
syncthing -c conf-$i --no-gui -l :2200$i $extraopts &
syncthing --home conf-$i $extraopts &
done
cat md5-* | sort > md5-tot

216
main.go
View File

@@ -3,6 +3,7 @@ package main
import (
"compress/gzip"
"crypto/tls"
"flag"
"fmt"
"log"
"net"
@@ -18,48 +19,10 @@ import (
"github.com/calmh/ini"
"github.com/calmh/syncthing/discover"
flags "github.com/calmh/syncthing/github.com/jessevdk/go-flags"
"github.com/calmh/syncthing/model"
"github.com/calmh/syncthing/protocol"
)
type Options struct {
ConfDir string `short:"c" long:"cfg" description:"Configuration directory" default:"~/.syncthing" value-name:"DIR"`
Listen string `short:"l" long:"listen" description:"Listen address" default:":22000" value-name:"ADDR"`
ReadOnly bool `short:"r" long:"ro" description:"Repository is read only"`
Rehash bool `long:"rehash" description:"Ignore cache and rehash all files in repository"`
NoDelete bool `long:"no-delete" description:"Never delete files"`
NoSymlinks bool `long:"no-symlinks" description:"Don't follow first level symlinks in the repo"`
NoStats bool `long:"no-stats" description:"Don't print model and connection statistics"`
NoGUI bool `long:"no-gui" description:"Don't start GUI"`
GUIAddr string `long:"gui-addr" description:"GUI listen address" default:"127.0.0.1:8080" value-name:"ADDR"`
ShowVersion bool `short:"v" long:"version" description:"Show version"`
Discovery DiscoveryOptions `group:"Discovery Options"`
Advanced AdvancedOptions `group:"Advanced Options"`
Debug DebugOptions `group:"Debugging Options"`
}
type DebugOptions struct {
LogSource bool `long:"log-source"`
TraceModel []string `long:"trace-model" value-name:"TRACE" description:"idx, net, file, need, pull"`
TraceConnect bool `long:"trace-connect"`
Profiler string `long:"profiler" value-name:"ADDR"`
}
type DiscoveryOptions struct {
ExternalServer string `long:"ext-server" description:"External discovery server" value-name:"NAME" default:"syncthing.nym.se"`
ExternalPort int `short:"e" long:"ext-port" description:"External listen port" value-name:"PORT" default:"22000"`
NoExternalDiscovery bool `short:"n" long:"no-ext-announce" description:"Do not announce presence externally"`
NoLocalDiscovery bool `short:"N" long:"no-local-announce" description:"Do not announce presence locally"`
}
type AdvancedOptions struct {
RequestsInFlight int `long:"reqs-in-flight" description:"Parallell in flight requests per node" default:"8" value-name:"REQS"`
LimitRate int `long:"send-rate" description:"Rate limit for outgoing data" default:"0" value-name:"KBPS"`
ScanInterval time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"`
ConnInterval time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"`
}
var opts Options
var Version string = "unknown-dev"
@@ -73,18 +36,27 @@ var (
nodeAddrs = make(map[string][]string)
)
func main() {
_, err := flags.Parse(&opts)
if err != nil {
if err, ok := err.(*flags.Error); ok {
if err.Type == flags.ErrHelp {
os.Exit(0)
}
}
fatalln(err)
}
var (
showVersion bool
showConfig bool
confDir string
trace string
profiler string
)
if opts.ShowVersion {
func main() {
log.SetOutput(os.Stderr)
logger = log.New(os.Stderr, "", log.Flags())
flag.StringVar(&confDir, "home", "~/.syncthing", "Set configuration directory")
flag.BoolVar(&showConfig, "config", false, "Print current configuration")
flag.StringVar(&trace, "debug.trace", "", "(connect,net,idx,file,pull)")
flag.StringVar(&profiler, "debug.profiler", "", "(addr)")
flag.BoolVar(&showVersion, "version", false, "Show version")
flag.Usage = usageFor(flag.CommandLine, "syncthing [options]")
flag.Parse()
if showVersion {
fmt.Println(Version)
os.Exit(0)
}
@@ -97,34 +69,73 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
log.SetOutput(os.Stderr)
logger = log.New(os.Stderr, "", log.Flags())
if len(opts.Debug.TraceModel) > 0 || opts.Debug.LogSource {
if len(trace) > 0 {
log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds)
logger.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds)
}
opts.ConfDir = expandTilde(opts.ConfDir)
infoln("Version", Version)
confDir = expandTilde(confDir)
// Ensure that our home directory exists and that we have a certificate and key.
ensureDir(opts.ConfDir, 0700)
cert, err := loadCert(opts.ConfDir)
ensureDir(confDir, 0700)
cert, err := loadCert(confDir)
if err != nil {
newCertificate(opts.ConfDir)
cert, err = loadCert(opts.ConfDir)
newCertificate(confDir)
cert, err = loadCert(confDir)
fatalErr(err)
}
myID = string(certId(cert.Certificate[0]))
infoln("My ID:", myID)
log.SetPrefix("[" + myID[0:5] + "] ")
logger.SetPrefix("[" + myID[0:5] + "] ")
if opts.Debug.Profiler != "" {
// Load the configuration file, if it exists.
// If it does not, create a template.
cfgFile := path.Join(confDir, confFileName)
cf, err := os.Open(cfgFile)
if err != nil {
infoln("My ID:", myID)
infoln("No config file; creating a template")
loadConfig(nil, &opts) //loads defaults
fd, err := os.Create(cfgFile)
if err != nil {
fatalln(err)
}
writeConfig(fd, "~/Sync", map[string]string{myID: "dynamic"}, opts, true)
fd.Close()
infof("Edit %s to suit and restart syncthing.", cfgFile)
os.Exit(0)
}
config = ini.Parse(cf)
cf.Close()
loadConfig(config.OptionMap("settings"), &opts)
if showConfig {
writeConfig(os.Stdout,
config.Get("repository", "dir"),
config.OptionMap("nodes"), opts, false)
os.Exit(0)
}
infoln("Version", Version)
infoln("My ID:", myID)
var dir = expandTilde(config.Get("repository", "dir"))
if len(dir) == 0 {
fatalln("No repository directory. Set dir under [repository] in syncthing.ini.")
}
if len(profiler) > 0 {
go func() {
err := http.ListenAndServe(opts.Debug.Profiler, nil)
err := http.ListenAndServe(profiler, nil)
if err != nil {
warnln(err)
}
@@ -144,18 +155,6 @@ func main() {
MinVersion: tls.VersionTLS12,
}
// Load the configuration file, if it exists.
cf, err := os.Open(path.Join(opts.ConfDir, confFileName))
if err != nil {
fatalln("No config file")
config = ini.Config{}
}
config = ini.Parse(cf)
cf.Close()
var dir = expandTilde(config.Get("repository", "dir"))
// Create a map of desired node connections based on the configuration file
// directives.
@@ -165,16 +164,16 @@ func main() {
}
ensureDir(dir, -1)
m := model.NewModel(dir)
for _, t := range opts.Debug.TraceModel {
m := model.NewModel(dir, opts.MaxChangeBW*1000)
for _, t := range strings.Split(trace, ",") {
m.Trace(t)
}
if opts.Advanced.LimitRate > 0 {
m.LimitRate(opts.Advanced.LimitRate)
if opts.LimitRate > 0 {
m.LimitRate(opts.LimitRate)
}
// GUI
if !opts.NoGUI && opts.GUIAddr != "" {
if opts.GUI && opts.GUIAddr != "" {
host, port, err := net.SplitHostPort(opts.GUIAddr)
if err != nil {
warnf("Cannot start GUI on %q: %v", opts.GUIAddr, err)
@@ -191,11 +190,8 @@ func main() {
// Walk the repository and update the local model before establishing any
// connections to other nodes.
if !opts.Rehash {
infoln("Loading index cache")
loadIndex(m)
}
infoln("Populating repository index")
loadIndex(m)
updateLocalModel(m)
// Routine to listen for incoming connections
@@ -209,13 +205,13 @@ func main() {
// Routine to pull blocks from other nodes to synchronize the local
// repository. Does not run when we are in read only (publish only) mode.
if !opts.ReadOnly {
if opts.NoDelete {
infoln("Deletes from peer nodes will be ignored")
} else {
if opts.Delete {
infoln("Deletes from peer nodes are allowed")
} else {
infoln("Deletes from peer nodes will be ignored")
}
okln("Ready to synchronize (read-write)")
m.StartRW(!opts.NoDelete, opts.Advanced.RequestsInFlight)
m.StartRW(opts.Delete, opts.ParallelRequests)
} else {
okln("Ready to synchronize (read only; no external updates accepted)")
}
@@ -224,17 +220,15 @@ func main() {
// XXX: Should use some fsnotify mechanism.
go func() {
for {
time.Sleep(opts.Advanced.ScanInterval)
if m.LocalAge() > opts.Advanced.ScanInterval.Seconds()/2 {
time.Sleep(opts.ScanInterval)
if m.LocalAge() > opts.ScanInterval.Seconds()/2 {
updateLocalModel(m)
}
}
}()
if !opts.NoStats {
// Periodically print statistics
go printStatsLoop(m)
}
// Periodically print statistics
go printStatsLoop(m)
select {}
}
@@ -274,6 +268,11 @@ func listen(myID string, addr string, m *model.Model, cfg *tls.Config) {
l, err := tls.Listen("tcp", addr, cfg)
fatalErr(err)
connOpts := map[string]string{
"clientId": "syncthing",
"clientVersion": Version,
}
listen:
for {
conn, err := l.Accept()
@@ -282,7 +281,7 @@ listen:
continue
}
if opts.Debug.TraceConnect {
if strings.Contains(trace, "connect") {
debugln("NET: Connect from", conn.RemoteAddr())
}
@@ -308,7 +307,7 @@ listen:
for nodeID := range nodeAddrs {
if nodeID == remoteID {
protoConn := protocol.NewConnection(remoteID, conn, conn, m)
protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
m.AddConnection(conn, protoConn)
continue listen
}
@@ -322,24 +321,29 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M
fatalErr(err)
port, _ := strconv.Atoi(portstr)
if opts.Discovery.NoLocalDiscovery {
if !opts.LocalDiscovery {
port = -1
} else {
infoln("Sending local discovery announcements")
}
if opts.Discovery.NoExternalDiscovery {
opts.Discovery.ExternalPort = -1
if !opts.ExternalDiscovery {
opts.ExternalServer = ""
} else {
infoln("Sending external discovery announcements")
}
disc, err := discover.NewDiscoverer(myID, port, opts.Discovery.ExternalPort, opts.Discovery.ExternalServer)
disc, err := discover.NewDiscoverer(myID, port, opts.ExternalServer)
if err != nil {
warnf("No discovery possible (%v)", err)
}
connOpts := map[string]string{
"clientId": "syncthing",
"clientVersion": Version,
}
for {
nextNode:
for nodeID, addrs := range nodeAddrs {
@@ -360,12 +364,12 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M
}
}
if opts.Debug.TraceConnect {
if strings.Contains(trace, "connect") {
debugln("NET: Dial", nodeID, addr)
}
conn, err := tls.Dial("tcp", addr, cfg)
if err != nil {
if opts.Debug.TraceConnect {
if strings.Contains(trace, "connect") {
debugln("NET:", err)
}
continue
@@ -378,25 +382,25 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M
continue
}
protoConn := protocol.NewConnection(remoteID, conn, conn, m)
protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
m.AddConnection(conn, protoConn)
continue nextNode
}
}
time.Sleep(opts.Advanced.ConnInterval)
time.Sleep(opts.ConnInterval)
}
}
func updateLocalModel(m *model.Model) {
files, _ := m.Walk(!opts.NoSymlinks)
files, _ := m.Walk(opts.Symlinks)
m.ReplaceLocal(files)
saveIndex(m)
}
func saveIndex(m *model.Model) {
name := m.RepoID() + ".idx.gz"
fullName := path.Join(opts.ConfDir, name)
fullName := path.Join(confDir, name)
idxf, err := os.Create(fullName + ".tmp")
if err != nil {
return
@@ -412,7 +416,7 @@ func saveIndex(m *model.Model) {
func loadIndex(m *model.Model) {
name := m.RepoID() + ".idx.gz"
idxf, err := os.Open(path.Join(opts.ConfDir, name))
idxf, err := os.Open(path.Join(confDir, name))
if err != nil {
return
}

View File

@@ -37,6 +37,15 @@ func Blocks(r io.Reader, blocksize int) ([]Block, error) {
offset += int64(n)
}
if len(blocks) == 0 {
// Empty file
blocks = append(blocks, Block{
Offset: 0,
Size: 0,
Hash: []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55},
})
}
return blocks, nil
}

View File

@@ -11,7 +11,8 @@ var blocksTestData = []struct {
blocksize int
hash []string
}{
{[]byte(""), 1024, []string{}},
{[]byte(""), 1024, []string{
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"}},
{[]byte("contents"), 1024, []string{
"d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}},
{[]byte("contents"), 9, []string{
@@ -86,7 +87,7 @@ var diffTestData = []struct {
{"contents", "cantents", 3, []Block{{0, 3, nil}}},
{"contents", "contants", 3, []Block{{3, 3, nil}}},
{"contents", "cantants", 3, []Block{{0, 3, nil}, {3, 3, nil}}},
{"contents", "", 3, nil},
{"contents", "", 3, []Block{{0, 0, nil}}},
{"", "contents", 3, []Block{{0, 3, nil}, {3, 3, nil}, {6, 2, nil}}},
{"con", "contents", 3, []Block{{3, 3, nil}, {6, 2, nil}}},
{"contents", "con", 3, nil},

View File

@@ -18,6 +18,7 @@ type FileQueue struct {
fmut sync.Mutex // protects files and sorted
availability map[string][]string
amut sync.Mutex // protects availability
queued map[string]bool
}
type queuedFile struct {
@@ -60,6 +61,7 @@ type queuedBlock struct {
func NewFileQueue() *FileQueue {
return &FileQueue{
availability: make(map[string][]string),
queued: make(map[string]bool),
}
}
@@ -67,10 +69,8 @@ func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
q.fmut.Lock()
defer q.fmut.Unlock()
for _, f := range q.files {
if f.name == name {
panic("re-adding added file " + f.name)
}
if q.queued[name] {
return
}
q.files = append(q.files, queuedFile{
@@ -81,6 +81,7 @@ func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
channel: make(chan content),
monitor: monitor,
})
q.queued[name] = true
q.sorted = false
}
@@ -116,6 +117,7 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
mon.FileDone()
}
}
delete(q.queued, qf.name)
q.deleteAt(i)
return queuedBlock{}, false
}
@@ -159,6 +161,7 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
err := qf.monitor.FileBegins(qf.channel)
if err != nil {
log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
delete(q.queued, qf.name)
q.deleteAt(i)
return
}
@@ -175,6 +178,7 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
log.Printf("WARNING: %s: %v", qf.name, err)
}
}
delete(q.queued, qf.name)
q.deleteAt(i)
}
return
@@ -183,18 +187,6 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) {
panic("unreachable")
}
func (q *FileQueue) Queued(file string) bool {
q.fmut.Lock()
defer q.fmut.Unlock()
for _, qf := range q.files {
if qf.name == file {
return true
}
}
return false
}
func (q *FileQueue) QueuedFiles() (files []string) {
q.fmut.Lock()
defer q.fmut.Unlock()
@@ -213,6 +205,7 @@ func (q *FileQueue) deleteFile(n string) {
for i, file := range q.files {
if n == file.name {
q.deleteAt(i)
delete(q.queued, file.name)
return
}
}

View File

@@ -46,12 +46,10 @@ type Model struct {
trace map[string]bool
fileLastChanged map[string]time.Time
fileWasSuppressed map[string]int
fmut sync.Mutex // protects fileLastChanged and fileWasSuppressed
sup suppressor
parallellRequests int
limitRequestRate chan struct{}
parallelRequests int
limitRequestRate chan struct{}
imut sync.Mutex // protects Index
}
@@ -61,6 +59,7 @@ type Connection interface {
Index([]protocol.FileInfo)
Request(name string, offset int64, size uint32, hash []byte) ([]byte, error)
Statistics() protocol.Statistics
Option(key string) string
}
const (
@@ -79,20 +78,19 @@ var (
// NewModel creates and starts a new model. The model starts in read-only mode,
// where it sends index information to connected peers and responds to requests
// for file data without altering the local repository in any way.
func NewModel(dir string) *Model {
func NewModel(dir string, maxChangeBw int) *Model {
m := &Model{
dir: dir,
global: make(map[string]File),
local: make(map[string]File),
remote: make(map[string]map[string]File),
protoConn: make(map[string]Connection),
rawConn: make(map[string]io.Closer),
lastIdxBcast: time.Now(),
trace: make(map[string]bool),
fileLastChanged: make(map[string]time.Time),
fileWasSuppressed: make(map[string]int),
fq: NewFileQueue(),
dq: make(chan File),
dir: dir,
global: make(map[string]File),
local: make(map[string]File),
remote: make(map[string]map[string]File),
protoConn: make(map[string]Connection),
rawConn: make(map[string]io.Closer),
lastIdxBcast: time.Now(),
trace: make(map[string]bool),
sup: suppressor{threshold: int64(maxChangeBw)},
fq: NewFileQueue(),
dq: make(chan File),
}
go m.broadcastIndexLoop()
@@ -132,7 +130,7 @@ func (m *Model) StartRW(del bool, threads int) {
m.rwRunning = true
m.delete = del
m.parallellRequests = threads
m.parallelRequests = threads
go m.cleanTempFiles()
if del {
@@ -158,7 +156,9 @@ func (m *Model) LocalAge() float64 {
type ConnectionInfo struct {
protocol.Statistics
Address string
Address string
ClientID string
ClientVersion string
}
// ConnectionStats returns a map with connection statistics for each connected node.
@@ -172,7 +172,9 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
var res = make(map[string]ConnectionInfo)
for node, conn := range m.protoConn {
ci := ConnectionInfo{
Statistics: conn.Statistics(),
Statistics: conn.Statistics(),
ClientID: conn.Option("clientId"),
ClientVersion: conn.Option("clientVersion"),
}
if nc, ok := m.rawConn[node].(remoteAddrer); ok {
ci.Address = nc.RemoteAddr().String()
@@ -257,6 +259,11 @@ func (m *Model) NeedFiles() (files []File, bytes int) {
// Index is called when a new node is connected and we receive their full index.
// Implements the protocol.Model interface.
func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
var files = make([]File, len(fs))
for i := range fs {
files[i] = fileFromFileInfo(fs[i])
}
m.imut.Lock()
defer m.imut.Unlock()
@@ -265,7 +272,7 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
}
repo := make(map[string]File)
for _, f := range fs {
for _, f := range files {
m.indexUpdate(repo, f)
}
@@ -274,17 +281,22 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
m.rmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForFiles(files)
}
// IndexUpdate is called for incremental updates to connected nodes' indexes.
// Implements the protocol.Model interface.
func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
var files = make([]File, len(fs))
for i := range fs {
files[i] = fileFromFileInfo(fs[i])
}
m.imut.Lock()
defer m.imut.Unlock()
if m.trace["net"] {
log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(files))
}
m.rmut.Lock()
@@ -295,16 +307,16 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
return
}
for _, f := range fs {
for _, f := range files {
m.indexUpdate(repo, f)
}
m.rmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForFiles(files)
}
func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
func (m *Model) indexUpdate(repo map[string]File, f File) {
if m.trace["idx"] {
var flagComment string
if f.Flags&protocol.FlagDeleted != 0 {
@@ -318,7 +330,7 @@ func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
return
}
repo[f.Name] = fileFromFileInfo(f)
repo[f.Name] = f
}
// Close removes the peer from the model and closes the underlying connection if possible.
@@ -342,7 +354,7 @@ func (m *Model) Close(node string, err error) {
m.pmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForGlobal()
}
// Request returns the specified data segment by reading it from local disk.
@@ -391,7 +403,6 @@ func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []b
}
// ReplaceLocal replaces the local repository index with the given list of files.
// Change suppression is applied to files changing too often.
func (m *Model) ReplaceLocal(fs []File) {
var updated bool
var newLocal = make(map[string]File)
@@ -421,7 +432,7 @@ func (m *Model) ReplaceLocal(fs []File) {
m.lmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForGlobal()
m.umut.Lock()
m.updatedLocal = time.Now().Unix()
@@ -442,7 +453,7 @@ func (m *Model) SeedLocal(fs []protocol.FileInfo) {
m.lmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
m.recomputeNeedForGlobal()
}
// ConnectedTo returns true if we are connected to the named node.
@@ -480,7 +491,7 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
return
}
for i := 0; i < m.parallellRequests; i++ {
for i := 0; i < m.parallelRequests; i++ {
i := i
go func() {
if m.trace["pull"] {
@@ -512,30 +523,6 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
}
}
func (m *Model) shouldSuppressChange(name string) bool {
m.fmut.Lock()
sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name])
if sup {
m.fileWasSuppressed[name]++
} else {
m.fileWasSuppressed[name] = 0
m.fileLastChanged[name] = time.Now()
}
m.fmut.Unlock()
return sup
}
func shouldSuppressChange(lastChange time.Time, numChanges int) bool {
sinceLast := time.Since(lastChange)
if sinceLast > maxFileHoldTimeS*time.Second {
return false
}
if sinceLast < time.Duration((numChanges+2)*minFileHoldTimeS)*time.Second {
return true
}
return false
}
// ProtocolIndex returns the current local index in protocol data types.
// Must be called with the read lock held.
func (m *Model) ProtocolIndex() []protocol.FileInfo {
@@ -667,6 +654,25 @@ func (m *Model) updateLocal(f File) {
}
}
/*
XXX: Not done, needs elegant handling of availability
func (m *Model) recomputeGlobalFor(files []File) bool {
m.gmut.Lock()
defer m.gmut.Unlock()
var updated bool
for _, f := range files {
if gf, ok := m.global[f.Name]; !ok || f.NewerThan(gf) {
m.global[f.Name] = f
updated = true
// Fix availability
}
}
return updated
}
*/
func (m *Model) recomputeGlobal() {
var newGlobal = make(map[string]File)
@@ -725,68 +731,92 @@ func (m *Model) recomputeGlobal() {
}
}
func (m *Model) recomputeNeed() {
type addOrder struct {
n string
remote []Block
fm *fileMonitor
}
type addOrder struct {
n string
remote []Block
fm *fileMonitor
}
func (m *Model) recomputeNeedForGlobal() {
var toDelete []File
var toAdd []addOrder
m.gmut.RLock()
for n, gf := range m.global {
m.lmut.RLock()
lf, ok := m.local[n]
m.lmut.RUnlock()
if !ok || gf.NewerThan(lf) {
if gf.Flags&protocol.FlagInvalid != 0 {
// Never attempt to sync invalid files
continue
}
if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
// Don't want to delete files, so forget this need
continue
}
if gf.Flags&protocol.FlagDeleted != 0 && !ok {
// Don't have the file, so don't need to delete it
continue
}
if m.trace["need"] {
log.Printf("NEED: lf:%v gf:%v", lf, gf)
}
if gf.Flags&protocol.FlagDeleted != 0 {
toDelete = append(toDelete, gf)
} else {
local, remote := BlockDiff(lf.Blocks, gf.Blocks)
fm := fileMonitor{
name: n,
path: path.Clean(path.Join(m.dir, n)),
global: gf,
model: m,
localBlocks: local,
}
toAdd = append(toAdd, addOrder{n, remote, &fm})
}
}
for _, gf := range m.global {
toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
}
m.gmut.RUnlock()
for _, ao := range toAdd {
if !m.fq.Queued(ao.n) {
m.fq.Add(ao.n, ao.remote, ao.fm)
}
m.fq.Add(ao.n, ao.remote, ao.fm)
}
for _, gf := range toDelete {
m.dq <- gf
}
}
func (m *Model) recomputeNeedForFiles(files []File) {
var toDelete []File
var toAdd []addOrder
m.gmut.RLock()
for _, gf := range files {
toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
}
m.gmut.RUnlock()
for _, ao := range toAdd {
m.fq.Add(ao.n, ao.remote, ao.fm)
}
for _, gf := range toDelete {
m.dq <- gf
}
}
func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) ([]addOrder, []File) {
m.lmut.RLock()
lf, ok := m.local[gf.Name]
m.lmut.RUnlock()
if !ok || gf.NewerThan(lf) {
if gf.Flags&protocol.FlagInvalid != 0 {
// Never attempt to sync invalid files
return toAdd, toDelete
}
if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
// Don't want to delete files, so forget this need
return toAdd, toDelete
}
if gf.Flags&protocol.FlagDeleted != 0 && !ok {
// Don't have the file, so don't need to delete it
return toAdd, toDelete
}
if m.trace["need"] {
log.Printf("NEED: lf:%v gf:%v", lf, gf)
}
if gf.Flags&protocol.FlagDeleted != 0 {
toDelete = append(toDelete, gf)
} else {
local, remote := BlockDiff(lf.Blocks, gf.Blocks)
fm := fileMonitor{
name: gf.Name,
path: path.Clean(path.Join(m.dir, gf.Name)),
global: gf,
model: m,
localBlocks: local,
}
toAdd = append(toAdd, addOrder{gf.Name, remote, &fm})
}
}
return toAdd, toDelete
}
func (m *Model) WhoHas(name string) []string {
var remote []string

View File

@@ -12,7 +12,7 @@ import (
)
func TestNewModel(t *testing.T) {
m := NewModel("foo")
m := NewModel("foo", 1e6)
if m == nil {
t.Fatalf("NewModel returned nil")
@@ -34,6 +34,12 @@ var testDataExpected = map[string]File{
Modified: 0,
Blocks: []Block{{Offset: 0x0, Size: 0x7, Hash: []uint8{0xae, 0xc0, 0x70, 0x64, 0x5f, 0xe5, 0x3e, 0xe3, 0xb3, 0x76, 0x30, 0x59, 0x37, 0x61, 0x34, 0xf0, 0x58, 0xcc, 0x33, 0x72, 0x47, 0xc9, 0x78, 0xad, 0xd1, 0x78, 0xb6, 0xcc, 0xdf, 0xb0, 0x1, 0x9f}}},
},
"empty": File{
Name: "empty",
Flags: 0,
Modified: 0,
Blocks: []Block{{Offset: 0x0, Size: 0x0, Hash: []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}}},
},
"bar": File{
Name: "bar",
Flags: 0,
@@ -53,7 +59,7 @@ func init() {
}
func TestUpdateLocal(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -95,7 +101,7 @@ func TestUpdateLocal(t *testing.T) {
}
func TestRemoteUpdateExisting(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -112,7 +118,7 @@ func TestRemoteUpdateExisting(t *testing.T) {
}
func TestRemoteAddNew(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -129,7 +135,7 @@ func TestRemoteAddNew(t *testing.T) {
}
func TestRemoteUpdateOld(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -147,7 +153,7 @@ func TestRemoteUpdateOld(t *testing.T) {
}
func TestRemoteIndexUpdate(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -180,7 +186,7 @@ func TestRemoteIndexUpdate(t *testing.T) {
}
func TestDelete(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -282,7 +288,7 @@ func TestDelete(t *testing.T) {
}
func TestForgetNode(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -335,7 +341,7 @@ func TestForgetNode(t *testing.T) {
}
func TestRequest(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -356,33 +362,8 @@ func TestRequest(t *testing.T) {
}
}
func TestSuppression(t *testing.T) {
var testdata = []struct {
lastChange time.Time
hold int
result bool
}{
{time.Unix(0, 0), 0, false}, // First change
{time.Now().Add(-1 * time.Second), 0, true}, // Changed once one second ago, suppress
{time.Now().Add(-119 * time.Second), 0, true}, // Changed once 119 seconds ago, suppress
{time.Now().Add(-121 * time.Second), 0, false}, // Changed once 121 seconds ago, permit
{time.Now().Add(-179 * time.Second), 1, true}, // Suppressed once 179 seconds ago, suppress again
{time.Now().Add(-181 * time.Second), 1, false}, // Suppressed once 181 seconds ago, permit
{time.Now().Add(-599 * time.Second), 99, true}, // Suppressed lots of times, last allowed 599 seconds ago, suppress again
{time.Now().Add(-601 * time.Second), 99, false}, // Suppressed lots of times, last allowed 601 seconds ago, permit
}
for i, tc := range testdata {
if shouldSuppressChange(tc.lastChange, tc.hold) != tc.result {
t.Errorf("Incorrect result for test #%d: %v", i, tc)
}
}
}
func TestIgnoreWithUnknownFlags(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
@@ -410,10 +391,7 @@ func TestIgnoreWithUnknownFlags(t *testing.T) {
}
}
func prepareModel(n int, m *Model) []protocol.FileInfo {
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
func genFiles(n int) []protocol.FileInfo {
files := make([]protocol.FileInfo, n)
t := time.Now().Unix()
for i := 0; i < n; i++ {
@@ -424,33 +402,39 @@ func prepareModel(n int, m *Model) []protocol.FileInfo {
}
}
m.Index("42", files)
return files
}
func BenchmarkRecomputeGlobal10k(b *testing.B) {
m := NewModel("testdata")
prepareModel(10000, m)
func BenchmarkIndex10000(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.recomputeGlobal()
m.Index("42", files)
}
}
func BenchmarkRecomputeNeed10K(b *testing.B) {
m := NewModel("testdata")
prepareModel(10000, m)
func BenchmarkIndex00100(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.recomputeNeed()
m.Index("42", files)
}
}
func BenchmarkIndexUpdate10000(b *testing.B) {
m := NewModel("testdata")
files := prepareModel(10000, m)
func BenchmarkIndexUpdate10000f10000(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
m.Index("42", files)
b.ResetTimer()
for i := 0; i < b.N; i++ {
@@ -458,6 +442,34 @@ func BenchmarkIndexUpdate10000(b *testing.B) {
}
}
func BenchmarkIndexUpdate10000f00100(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
m.Index("42", files)
ufiles := genFiles(100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate("42", ufiles)
}
}
func BenchmarkIndexUpdate10000f00001(b *testing.B) {
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := genFiles(10000)
m.Index("42", files)
ufiles := genFiles(1)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate("42", ufiles)
}
}
type FakeConnection struct {
id string
requestData []byte
@@ -471,6 +483,10 @@ func (f FakeConnection) ID() string {
return string(f.id)
}
func (f FakeConnection) Option(string) string {
return ""
}
func (FakeConnection) Index([]protocol.FileInfo) {}
func (f FakeConnection) Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) {
@@ -486,7 +502,7 @@ func (FakeConnection) Statistics() protocol.Statistics {
}
func BenchmarkRequest(b *testing.B) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)

72
model/suppressor.go Normal file
View File

@@ -0,0 +1,72 @@
package model
import (
"sync"
"time"
)
const (
MAX_CHANGE_HISTORY = 4
)
type change struct {
size int64
when time.Time
}
type changeHistory struct {
changes []change
next int64
prevSup bool
}
type suppressor struct {
sync.Mutex
changes map[string]changeHistory
threshold int64 // bytes/s
}
func (h changeHistory) bandwidth(t time.Time) int64 {
if len(h.changes) == 0 {
return 0
}
var t0 = h.changes[0].when
if t == t0 {
return 0
}
var bw float64
for _, c := range h.changes {
bw += float64(c.size)
}
return int64(bw / t.Sub(t0).Seconds())
}
func (h *changeHistory) append(size int64, t time.Time) {
c := change{size, t}
if len(h.changes) == MAX_CHANGE_HISTORY {
h.changes = h.changes[1:MAX_CHANGE_HISTORY]
}
h.changes = append(h.changes, c)
}
func (s *suppressor) suppress(name string, size int64, t time.Time) (bool, bool) {
s.Lock()
if s.changes == nil {
s.changes = make(map[string]changeHistory)
}
h := s.changes[name]
sup := h.bandwidth(t) > s.threshold
prevSup := h.prevSup
h.prevSup = sup
if !sup {
h.append(size, t)
}
s.changes[name] = h
s.Unlock()
return sup, prevSup
}

113
model/suppressor_test.go Normal file
View File

@@ -0,0 +1,113 @@
package model
import (
"testing"
"time"
)
func TestSuppressor(t *testing.T) {
s := suppressor{threshold: 10000}
t0 := time.Now()
t1 := t0
sup, prev := s.suppress("foo", 10000, t1)
if sup {
t.Fatal("Never suppress first change")
}
if prev {
t.Fatal("Incorrect prev status")
}
// bw is 10000 / 10 = 1000
t1 = t0.Add(10 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 1000 {
t.Error("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 10000, t1)
if sup {
t.Fatal("Should still be fine")
}
if prev {
t.Fatal("Incorrect prev status")
}
// bw is (10000 + 10000) / 11 = 1818
t1 = t0.Add(11 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 1818 {
t.Error("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 100500, t1)
if sup {
t.Fatal("Should still be fine")
}
if prev {
t.Fatal("Incorrect prev status")
}
// bw is (10000 + 10000 + 100500) / 12 = 10041
t1 = t0.Add(12 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 10041 {
t.Error("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 10000000, t1) // value will be ignored
if !sup {
t.Fatal("Should be over threshold")
}
if prev {
t.Fatal("Incorrect prev status")
}
// bw is (10000 + 10000 + 100500) / 15 = 8033
t1 = t0.Add(15 * time.Second)
if bw := s.changes["foo"].bandwidth(t1); bw != 8033 {
t.Error("Incorrect bw %d", bw)
}
sup, prev = s.suppress("foo", 10000000, t1)
if sup {
t.Fatal("Should be Ok")
}
if !prev {
t.Fatal("Incorrect prev status")
}
}
func TestHistory(t *testing.T) {
h := changeHistory{}
t0 := time.Now()
h.append(40, t0)
if l := len(h.changes); l != 1 {
t.Errorf("Incorrect history length %d", l)
}
if s := h.changes[0].size; s != 40 {
t.Errorf("Incorrect first record size %d", s)
}
for i := 1; i < MAX_CHANGE_HISTORY; i++ {
h.append(int64(40+i), t0.Add(time.Duration(i)*time.Second))
}
if l := len(h.changes); l != MAX_CHANGE_HISTORY {
t.Errorf("Incorrect history length %d", l)
}
if s := h.changes[0].size; s != 40 {
t.Errorf("Incorrect first record size %d", s)
}
if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 40+MAX_CHANGE_HISTORY-1 {
t.Errorf("Incorrect last record size %d", s)
}
h.append(999, t0.Add(time.Duration(999)*time.Second))
if l := len(h.changes); l != MAX_CHANGE_HISTORY {
t.Errorf("Incorrect history length %d", l)
}
if s := h.changes[0].size; s != 41 {
t.Errorf("Incorrect first record size %d", s)
}
if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 999 {
t.Errorf("Incorrect last record size %d", s)
}
}

0
model/testdata/empty vendored Normal file
View File

View File

@@ -32,7 +32,7 @@ func (f File) Size() (bytes int) {
}
func (f File) String() string {
return fmt.Sprintf("File{Name:%q, Flags:0x%x, Modified:%d, Version:%d:, NumBlocks:%d}",
return fmt.Sprintf("File{Name:%q, Flags:0x%x, Modified:%d, Version:%d, NumBlocks:%d}",
f.Name, f.Flags, f.Modified, f.Version, len(f.Blocks))
}
@@ -126,9 +126,12 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.
}
*res = append(*res, lf)
} else {
if m.shouldSuppressChange(rn) {
if cur, prev := m.sup.suppress(rn, info.Size(), time.Now()); cur {
if m.trace["file"] {
log.Println("FILE: SUPPRESS:", rn, m.fileWasSuppressed[rn], time.Since(m.fileLastChanged[rn]))
log.Printf("FILE: SUPPRESS: %q change bw over threshold", rn)
}
if !prev {
log.Printf("INFO: Changes to %q are being temporarily suppressed because it changes too frequently.", rn)
}
if ok {
@@ -137,6 +140,8 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.
*res = append(*res, lf)
}
return nil
} else if prev && !cur {
log.Printf("INFO: Changes to %q are no longer suppressed.", rn)
}
if m.trace["file"] {

View File

@@ -13,6 +13,7 @@ var testdata = []struct {
hash string
}{
{"bar", 10, "2f72cc11a6fcd0271ecef8c61056ee1eb1243be3805bf9a9df98f92f7636b05c"},
{"empty", 0, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"},
{"foo", 7, "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f"},
}
@@ -21,7 +22,7 @@ var correctIgnores = map[string][]string{
}
func TestWalk(t *testing.T) {
m := NewModel("testdata")
m := NewModel("testdata", 1e6)
files, ignores := m.Walk(false)
if l1, l2 := len(files), len(testdata); l1 != l2 {

View File

@@ -193,6 +193,33 @@ model, the Index Update merely amends it with new or updated file
information. Any files not mentioned in an Index Update are left
unchanged.
### Options (Type = 7)
This informational message provides information about the client
configuration, version, etc. It is sent at connection initiation and,
optionally, when any of the sent parameters have changed. The message is
in the form of a list of (key, value) pairs, both of string type.
struct OptionsMessage {
KeyValue Options<>;
}
struct KeyValue {
string Key;
string Value;
}
Key ID:s apart from the well known ones are implementation
specific. An implementation is expected to ignore unknown keys. An
implementation may impose limits on key and value size.
Well known keys:
- "clientId" -- The name of the implementation. Example: "syncthing".
- "clientVersion" -- The version of the client. Example: "v1.0.33-47". The
Following the SemVer 2.0 specification for version strings is
encouraged but not enforced.
Example Exchange
----------------

View File

@@ -65,6 +65,14 @@ func (w *marshalWriter) writeResponse(data []byte) {
w.writeBytes(data)
}
func (w *marshalWriter) writeOptions(opts map[string]string) {
w.writeUint32(uint32(len(opts)))
for k, v := range opts {
w.writeString(k)
w.writeString(v)
}
}
func (r *marshalReader) readHeader() header {
return decodeHeader(r.readUint32())
}
@@ -109,3 +117,14 @@ func (r *marshalReader) readRequest() request {
func (r *marshalReader) readResponse() []byte {
return r.readBytes()
}
func (r *marshalReader) readOptions() map[string]string {
n := r.readUint32()
opts := make(map[string]string, n)
for i := 0; i < int(n); i++ {
k := r.readString()
v := r.readString()
opts[k] = v
}
return opts
}

View File

@@ -117,3 +117,23 @@ func BenchmarkWriteRequest(b *testing.B) {
wr.writeRequest(req)
}
}
func TestOptions(t *testing.T) {
opts := map[string]string{
"foo": "bar",
"someKey": "otherValue",
"hello": "",
"": "42",
}
var buf = new(bytes.Buffer)
var wr = marshalWriter{w: buf}
wr.writeOptions(opts)
var rd = marshalReader{r: buf}
var ropts = rd.readOptions()
if !reflect.DeepEqual(opts, ropts) {
t.Error("Incorrect options marshal/demarshal")
}
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"log"
"sync"
"time"
@@ -18,6 +19,7 @@ const (
messageTypePing = 4
messageTypePong = 5
messageTypeIndexUpdate = 6
messageTypeOptions = 7
)
const (
@@ -52,16 +54,18 @@ type Model interface {
type Connection struct {
sync.RWMutex
id string
receiver Model
reader io.Reader
mreader *marshalReader
writer io.Writer
mwriter *marshalWriter
closed bool
awaiting map[int]chan asyncResult
nextId int
indexSent map[string][2]int64
id string
receiver Model
reader io.Reader
mreader *marshalReader
writer io.Writer
mwriter *marshalWriter
closed bool
awaiting map[int]chan asyncResult
nextId int
indexSent map[string][2]int64
options map[string]string
optionsLock sync.Mutex
hasSentIndex bool
hasRecvdIndex bool
@@ -81,7 +85,7 @@ const (
pingIdleTime = 5 * time.Minute
)
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection {
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model, options map[string]string) *Connection {
flrd := flate.NewReader(reader)
flwr, err := flate.NewWriter(writer, flate.BestSpeed)
if err != nil {
@@ -101,6 +105,20 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
go c.readerLoop()
go c.pingerLoop()
if options != nil {
go func() {
c.Lock()
c.mwriter.writeHeader(header{0, c.nextId, messageTypeOptions})
c.mwriter.writeOptions(options)
err := c.flush()
if err != nil {
log.Printf("Warning:", err)
}
c.nextId++
c.Unlock()
}()
}
return &c
}
@@ -328,6 +346,11 @@ loop:
c.Unlock()
}
case messageTypeOptions:
c.optionsLock.Lock()
c.options = c.mreader.readOptions()
c.optionsLock.Unlock()
default:
c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
break loop
@@ -396,3 +419,9 @@ func (c *Connection) Statistics() Statistics {
return stats
}
func (c *Connection) Option(key string) string {
c.optionsLock.Lock()
defer c.optionsLock.Unlock()
return c.options[key]
}

View File

@@ -43,8 +43,8 @@ func TestPing(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, nil)
c1 := NewConnection("c1", br, aw, nil)
c0 := NewConnection("c0", ar, bw, nil, nil)
c1 := NewConnection("c1", br, aw, nil, nil)
if ok := c0.ping(); !ok {
t.Error("c0 ping failed")
@@ -67,8 +67,8 @@ func TestPingErr(t *testing.T) {
eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e}
ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
c0 := NewConnection("c0", ar, ebw, m0)
NewConnection("c1", br, eaw, m1)
c0 := NewConnection("c0", ar, ebw, m0, nil)
NewConnection("c1", br, eaw, m1, nil)
res := c0.ping()
if (i < 4 || j < 4) && res {
@@ -94,8 +94,8 @@ func TestRequestResponseErr(t *testing.T) {
eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e}
ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
NewConnection("c0", ar, ebw, m0)
c1 := NewConnection("c1", br, eaw, m1)
NewConnection("c0", ar, ebw, m0, nil)
c1 := NewConnection("c1", br, eaw, m1, nil)
d, err := c1.Request("tn", 1234, 3456, []byte("hashbytes"))
if err == e || err == ErrClosed {
@@ -143,8 +143,8 @@ func TestVersionErr(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0)
NewConnection("c1", br, aw, m1)
c0 := NewConnection("c0", ar, bw, m0, nil)
NewConnection("c1", br, aw, m1, nil)
c0.mwriter.writeHeader(header{
version: 2,
@@ -165,8 +165,8 @@ func TestTypeErr(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0)
NewConnection("c1", br, aw, m1)
c0 := NewConnection("c0", ar, bw, m0, nil)
NewConnection("c1", br, aw, m1, nil)
c0.mwriter.writeHeader(header{
version: 0,
@@ -187,8 +187,8 @@ func TestClose(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0)
NewConnection("c1", br, aw, m1)
c0 := NewConnection("c0", ar, bw, m0, nil)
NewConnection("c1", br, aw, m1, nil)
c0.close(nil)

52
usage.go Normal file
View File

@@ -0,0 +1,52 @@
package main
import (
"bytes"
"flag"
"fmt"
"io"
"text/tabwriter"
)
func optionTable(w io.Writer, rows [][]string) {
tw := tabwriter.NewWriter(w, 2, 4, 2, ' ', 0)
for _, row := range rows {
for i, cell := range row {
if i > 0 {
tw.Write([]byte("\t"))
}
tw.Write([]byte(cell))
}
tw.Write([]byte("\n"))
}
tw.Flush()
}
func usageFor(fs *flag.FlagSet, usage string) func() {
return func() {
var b bytes.Buffer
b.WriteString("Usage:\n " + usage + "\n")
var options [][]string
fs.VisitAll(func(f *flag.Flag) {
var dash = "-"
if len(f.Name) > 1 {
dash = "--"
}
var opt = " " + dash + f.Name
if f.DefValue != "false" {
opt += "=" + f.DefValue
}
options = append(options, []string{opt, f.Usage})
})
if len(options) > 0 {
b.WriteString("\nOptions:\n")
optionTable(&b, options)
}
fmt.Println(b.String())
}
}