Compare commits

..

7 Commits

Author SHA1 Message Date
Jakob Borg
832c0ffad0 Report CPU/mem usage in GUI 2014-01-10 00:12:32 +01:00
Jakob Borg
cb33f27f23 Woops: reignore .stignore 2014-01-09 23:00:42 +01:00
Jakob Borg
92dee7c082 Only fetch deps, don't build 2014-01-09 23:00:23 +01:00
Jakob Borg
b9af45bc6b Prepopulate ignore patterns (fixes #21) 2014-01-09 22:46:01 +01:00
Jakob Borg
a18f6c6d90 Do go get as part of build unless fast build requested (fixes #31) 2014-01-09 21:22:05 +01:00
Jakob Borg
6e11e3cda9 Build for Linux on ARM (fixes #32) 2014-01-09 21:17:41 +01:00
Jakob Borg
2935aebe53 Benchmarking 2014-01-09 14:11:55 +01:00
12 changed files with 418 additions and 175 deletions

View File

@@ -3,21 +3,27 @@
version=$(git describe --always)
buildDir=dist
if [[ -z $1 ]] ; then
if [[ $1 == "-f" ]] ; then
fast=yes
shift
fi
if [[ $fast != yes ]] ; then
go get -d
go test ./...
fi
if [[ -z $1 ]] ; then
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing gui
elif [[ $1 == "tar" ]] ; then
go test ./...
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing gui \
&& mkdir syncthing-dist \
&& cp syncthing README.md LICENSE syncthing-dist \
&& tar zcvf syncthing-dist.tar.gz syncthing-dist \
&& rm -rf syncthing-dist
else
go test ./... || exit 1
elif [[ $1 == "all" ]] ; then
rm -rf "$buildDir"
mkdir -p "$buildDir" || exit 1
@@ -38,6 +44,26 @@ else
done
done
for goos in linux ; do
for goarm in 5 6 7 ; do
for goarch in arm ; do
echo "$goos-${goarch}v$goarm"
export GOARM="$goarm"
export GOOS="$goos"
export GOARCH="$goarch"
export name="syncthing-$goos-${goarch}v$goarm"
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing gui \
&& mkdir -p "$name" \
&& cp syncthing "$buildDir/$name" \
&& cp README.md LICENSE "$name" \
&& mv syncthing "$name" \
&& tar zcf "$buildDir/$name.tar.gz" "$name" \
&& rm -r "$name"
done
done
done
for goos in windows ; do
for goarch in amd64 386 ; do
echo "$goos-$goarch"

23
gui.go
View File

@@ -8,6 +8,8 @@ import (
"mime"
"net/http"
"path/filepath"
"runtime"
"sync"
"bitbucket.org/tebeka/nrsc"
"github.com/calmh/syncthing/model"
@@ -22,6 +24,7 @@ func startGUI(addr string, m *model.Model) {
router.Get("/rest/connections", restGetConnections)
router.Get("/rest/config", restGetConfig)
router.Get("/rest/need", restGetNeed)
router.Get("/rest/system", restGetSystem)
go func() {
mr := martini.New()
@@ -34,6 +37,7 @@ func startGUI(addr string, m *model.Model) {
warnln("GUI not possible:", err)
}
}()
}
func getRoot(w http.ResponseWriter, r *http.Request) {
@@ -100,6 +104,25 @@ func restGetNeed(m *model.Model, w http.ResponseWriter) {
json.NewEncoder(w).Encode(gfs)
}
var cpuUsagePercent float64
var cpuUsageLock sync.RWMutex
func restGetSystem(w http.ResponseWriter) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
res := make(map[string]interface{})
res["goroutines"] = runtime.NumGoroutine()
res["alloc"] = m.Alloc
res["sys"] = m.Sys
cpuUsageLock.RLock()
res["cpuPercent"] = cpuUsagePercent
cpuUsageLock.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(res)
}
func nrscStatic(path string) interface{} {
if err := nrsc.Initialize(); err != nil {
panic("Unable to initialize nrsc: " + err.Error())

View File

@@ -26,6 +26,9 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
});
$scope.refresh = function () {
$http.get("/rest/system").success(function (data) {
$scope.system = data;
});
$http.get("/rest/model").success(function (data) {
$scope.model = data;
modelGetSucceeded();
@@ -71,16 +74,19 @@ syncthing.controller('SyncthingCtrl', function ($scope, $http) {
setInterval($scope.refresh, 10000);
});
function decimals(num) {
if (num > 100) {
return 0;
}
if (num > 10) {
return 1;
}
return 2;
function decimals(val, num) {
if (val === 0) { return 0; }
var digits = Math.floor(Math.log(Math.abs(val))/Math.log(10));
var decimals = Math.max(0, num - digits);
return decimals;
}
syncthing.filter('natural', function() {
return function(input, valid) {
return input.toFixed(decimals(input, valid));
}
});
syncthing.filter('binary', function() {
return function(input) {
if (input === undefined) {
@@ -88,15 +94,15 @@ syncthing.filter('binary', function() {
}
if (input > 1024 * 1024 * 1024) {
input /= 1024 * 1024 * 1024;
return input.toFixed(decimals(input)) + ' Gi';
return input.toFixed(decimals(input, 2)) + ' Gi';
}
if (input > 1024 * 1024) {
input /= 1024 * 1024;
return input.toFixed(decimals(input)) + ' Mi';
return input.toFixed(decimals(input, 2)) + ' Mi';
}
if (input > 1024) {
input /= 1024;
return input.toFixed(decimals(input)) + ' Ki';
return input.toFixed(decimals(input, 2)) + ' Ki';
}
return Math.round(input) + ' ';
}
@@ -109,15 +115,15 @@ syncthing.filter('metric', function() {
}
if (input > 1000 * 1000 * 1000) {
input /= 1000 * 1000 * 1000;
return input.toFixed(decimals(input)) + ' G';
return input.toFixed(decimals(input, 2)) + ' G';
}
if (input > 1000 * 1000) {
input /= 1000 * 1000;
return input.toFixed(decimals(input)) + ' M';
return input.toFixed(decimals(input, 2)) + ' M';
}
if (input > 1000) {
input /= 1000;
return input.toFixed(decimals(input)) + ' k';
return input.toFixed(decimals(input, 2)) + ' k';
}
return Math.round(input) + ' ';
}

View File

@@ -47,27 +47,42 @@ html, body {
<div class="row">
<div class="col-md-12">
<h2>Synchronization</h2>
<div class="progress">
<div class="progress-bar" role="progressbar" aria-valuenow="60" aria-valuemin="0" aria-valuemax="100"
ng-class="{'progress-bar-success': model.needBytes === 0, 'progress-bar-info': model.needBytes !== 0}"
style="width: {{100 * model.inSyncBytes / model.globalBytes | number:2}}%;">
{{100 * model.inSyncBytes / model.globalBytes | number:0}}%
<div class="panel" ng-class="{'panel-success': model.needBytes === 0, 'panel-primary': model.needBytes !== 0}">
<div class="panel-heading"><h3 class="panel-title">Synchronization</h3></div>
<div class="panel-body">
<div class="progress">
<div class="progress-bar" role="progressbar" aria-valuenow="60" aria-valuemin="0" aria-valuemax="100"
ng-class="{'progress-bar-success': model.needBytes === 0, 'progress-bar-info': model.needBytes !== 0}"
style="width: {{100 * model.inSyncBytes / model.globalBytes | number:2}}%;">
{{100 * model.inSyncBytes / model.globalBytes | alwaysNumber | number:0}}%
</div>
</div>
<p ng-show="model.needBytes > 0">Need {{model.needFiles | alwaysNumber}} files, {{model.needBytes | binary}}B</p>
</div>
</div>
<p ng-show="model.needBytes > 0">Need {{model.needFiles | alwaysNumber}} files, {{model.needBytes | binary}}B</p>
</div>
</div>
<div class="row">
<div class="col-md-6">
<h1>Repository Status</h1>
<div class="panel panel-info">
<div class="panel-heading"><h3 class="panel-title">Repository</h3></div>
<div class="panel-body">
<p>Cluster contains {{model.globalFiles | alwaysNumber}} files, {{model.globalBytes | binary}}B
<span class="text-muted">(+{{model.globalDeleted | alwaysNumber}} delete records)</span></p>
<p>Cluster contains {{model.globalFiles | alwaysNumber}} files, {{model.globalBytes | binary}}B
<span class="text-muted">(+{{model.globalDeleted | alwaysNumber}} delete records)</span></p>
<p>Local repository has {{model.localFiles | alwaysNumber}} files, {{model.localBytes | binary}}B
<span class="text-muted">(+{{model.localDeleted | alwaysNumber}} delete records)</span></p>
</div>
</div>
<p>Local repository has {{model.localFiles | alwaysNumber}} files, {{model.localBytes | binary}}B
<span class="text-muted">(+{{model.localDeleted | alwaysNumber}} delete records)</span></p>
<div class="panel panel-info">
<div class="panel-heading"><h3 class="panel-title">System</h3></div>
<div class="panel-body">
<p>{{system.sys | binary}}B RAM allocated, {{system.alloc | binary}}B in use</p>
<p>{{system.cpuPercent | alwaysNumber | natural:1}}% CPU, {{system.goroutines | alwaysNumber}} goroutines</p>
</div>
</div>
<div ng-show="model.needFiles > 0">
<h2>Files to Synchronize</h2>
@@ -80,32 +95,34 @@ html, body {
</div>
</div>
<div class="col-md-6">
<h1>Cluster Status</h1>
<table class="table table-condensed">
<tbody>
<tr ng-repeat="(node, address) in config.nodes" ng-class="{'text-primary': !!connections[node]}">
<td><abbr class="text-monospace" title="{{node}}">{{node | short}}</abbr></td>
<td>
<span ng-show="!!connections[node]">
<span class="glyphicon glyphicon-link"></span>
{{connections[node].Address}}
</span>
<span ng-hide="!!connections[node]">
<span class="glyphicon glyphicon-cog"></span>
{{address}}
</span>
</td>
<td class="text-right">
<abbr title="{{connections[node].InBytesTotal | binary}}B">{{connections[node].inbps | metric}}b/s</abbr>
<span class="text-muted glyphicon glyphicon-cloud-download"></span>
</td>
<td class="text-right">
<abbr title="{{connections[node].OutBytesTotal | binary}}B">{{connections[node].outbps | metric}}b/s</abbr>
<span class="text-muted glyphicon glyphicon-cloud-upload"></span>
</td>
</tr>
</tbody>
</table>
<div class="panel panel-info">
<div class="panel-heading"><h3 class="panel-title">Cluster</h3></div>
<table class="table table-condensed">
<tbody>
<tr ng-repeat="(node, address) in config.nodes" ng-class="{'text-primary': !!connections[node]}">
<td><abbr class="text-monospace" title="{{node}}">{{node | short}}</abbr></td>
<td>
<span ng-show="!!connections[node]">
<span class="glyphicon glyphicon-link"></span>
{{connections[node].Address}}
</span>
<span ng-hide="!!connections[node]">
<span class="glyphicon glyphicon-cog"></span>
{{address}}
</span>
</td>
<td class="text-right">
<abbr title="{{connections[node].InBytesTotal | binary}}B">{{connections[node].inbps | metric}}b/s</abbr>
<span class="text-muted glyphicon glyphicon-cloud-download"></span>
</td>
<td class="text-right">
<abbr title="{{connections[node].OutBytesTotal | binary}}B">{{connections[node].outbps | metric}}b/s</abbr>
<span class="text-muted glyphicon glyphicon-cloud-upload"></span>
</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
</div>
@@ -123,7 +140,7 @@ html, body {
<div class="modal-content">
<div class="modal-header alert alert-danger">
<h4 class="modal-title">
<span class="glyphicon glyphicon-exclamation-sign"></span>
<span class="glyphicon glyphicon-exclamation-sign"></span>
Connection Error
</h4>
</div>

31
gui_unix.go Normal file
View File

@@ -0,0 +1,31 @@
//+build !windows
package main
import (
"syscall"
"time"
)
func init() {
go trackCPUUsage()
}
func trackCPUUsage() {
var prevUsage int64
var prevTime = time.Now().UnixNano()
var rusage syscall.Rusage
for {
time.Sleep(10 * time.Second)
syscall.Getrusage(syscall.RUSAGE_SELF, &rusage)
curTime := time.Now().UnixNano()
timeDiff := curTime - prevTime
curUsage := rusage.Utime.Nano() + rusage.Stime.Nano()
usageDiff := curUsage - prevUsage
cpuUsageLock.Lock()
cpuUsagePercent = 100 * float64(usageDiff) / float64(timeDiff)
cpuUsageLock.Unlock()
prevTime = curTime
prevUsage = curUsage
}
}

18
main.go
View File

@@ -10,6 +10,8 @@ import (
_ "net/http/pprof"
"os"
"path"
"runtime"
"runtime/debug"
"strconv"
"strings"
"time"
@@ -81,6 +83,14 @@ func main() {
os.Exit(0)
}
if len(os.Getenv("GOGC")) == 0 {
debug.SetGCPercent(25)
}
if len(os.Getenv("GOMAXPROCS")) == 0 {
runtime.GOMAXPROCS(runtime.NumCPU())
}
if len(opts.Debug.TraceModel) > 0 || opts.Debug.LogSource {
logger = log.New(os.Stderr, "", log.Lshortfile|log.Ldate|log.Ltime|log.Lmicroseconds)
}
@@ -282,7 +292,8 @@ listen:
for nodeID := range nodeAddrs {
if nodeID == remoteID {
m.AddConnection(conn, remoteID)
protoConn := protocol.NewConnection(remoteID, conn, conn, m)
m.AddConnection(conn, protoConn)
continue listen
}
}
@@ -351,7 +362,8 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M
continue
}
m.AddConnection(conn, remoteID)
protoConn := protocol.NewConnection(remoteID, conn, conn, m)
m.AddConnection(conn, protoConn)
continue nextNode
}
}
@@ -361,7 +373,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M
}
func updateLocalModel(m *model.Model) {
files := m.FilteredWalk(!opts.NoSymlinks)
files, _ := m.Walk(!opts.NoSymlinks)
m.ReplaceLocal(files)
saveIndex(m)
}

View File

@@ -31,12 +31,12 @@ type Model struct {
sync.RWMutex
dir string
global map[string]File // the latest version of each file as it exists in the cluster
local map[string]File // the files we currently have locally on disk
remote map[string]map[string]File
need map[string]bool // the files we need to update
nodes map[string]*protocol.Connection
rawConn map[string]io.ReadWriteCloser
global map[string]File // the latest version of each file as it exists in the cluster
local map[string]File // the files we currently have locally on disk
remote map[string]map[string]File
need map[string]bool // the files we need to update
protoConn map[string]Connection
rawConn map[string]io.Closer
updatedLocal int64 // timestamp of last update to local
updateGlobal int64 // timestamp of last update to remote
@@ -55,6 +55,13 @@ type Model struct {
fileWasSuppressed map[string]int
}
type Connection interface {
ID() string
Index([]protocol.FileInfo)
Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error)
Statistics() protocol.Statistics
}
const (
idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
@@ -78,8 +85,8 @@ func NewModel(dir string) *Model {
local: make(map[string]File),
remote: make(map[string]map[string]File),
need: make(map[string]bool),
nodes: make(map[string]*protocol.Connection),
rawConn: make(map[string]io.ReadWriteCloser),
protoConn: make(map[string]Connection),
rawConn: make(map[string]io.Closer),
lastIdxBcast: time.Now(),
trace: make(map[string]bool),
fileLastChanged: make(map[string]time.Time),
@@ -141,7 +148,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
defer m.RUnlock()
var res = make(map[string]ConnectionInfo)
for node, conn := range m.nodes {
for node, conn := range m.protoConn {
ci := ConnectionInfo{
Statistics: conn.Statistics(),
}
@@ -288,7 +295,7 @@ func (m *Model) Close(node string, err error) {
}
delete(m.remote, node)
delete(m.nodes, node)
delete(m.protoConn, node)
delete(m.rawConn, node)
m.recomputeGlobal()
@@ -383,7 +390,7 @@ func (m *Model) SeedLocal(fs []protocol.FileInfo) {
func (m *Model) ConnectedTo(nodeID string) bool {
m.RLock()
defer m.RUnlock()
_, ok := m.nodes[nodeID]
_, ok := m.protoConn[nodeID]
return ok
}
@@ -402,12 +409,11 @@ func (m *Model) RepoID() string {
// AddConnection adds a new peer connection to the model. An initial index will
// be sent to the connected peer, thereafter index updates whenever the local
// repository changes.
func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
node := protocol.NewConnection(nodeID, conn, conn, m)
func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
nodeID := protoConn.ID()
m.Lock()
m.nodes[nodeID] = node
m.rawConn[nodeID] = conn
m.protoConn[nodeID] = protoConn
m.rawConn[nodeID] = rawConn
m.Unlock()
m.RLock()
@@ -415,7 +421,7 @@ func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
m.RUnlock()
go func() {
node.Index(idx)
protoConn.Index(idx)
}()
}
@@ -461,7 +467,7 @@ func (m *Model) protocolIndex() []protocol.FileInfo {
func (m *Model) requestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
m.RLock()
nc, ok := m.nodes[nodeID]
nc, ok := m.protoConn[nodeID]
m.RUnlock()
if !ok {
return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
@@ -485,10 +491,10 @@ func (m *Model) broadcastIndexLoop() {
if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
m.Lock()
var indexWg sync.WaitGroup
indexWg.Add(len(m.nodes))
indexWg.Add(len(m.protoConn))
idx := m.protocolIndex()
m.lastIdxBcast = time.Now()
for _, node := range m.nodes {
for _, node := range m.protoConn {
node := node
if m.trace["net"] {
log.Printf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
@@ -547,10 +553,14 @@ func (m *Model) recomputeGlobal() {
newGlobal[n] = f
}
var highestMod int64
for _, fs := range m.remote {
for n, nf := range fs {
if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
newGlobal[n] = nf
if nf.Modified > highestMod {
highestMod = nf.Modified
}
}
}
}
@@ -558,7 +568,7 @@ func (m *Model) recomputeGlobal() {
// Figure out if anything actually changed
var updated bool
if len(newGlobal) != len(m.global) {
if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
updated = true
} else {
for n, f0 := range newGlobal {
@@ -616,14 +626,14 @@ func (m *Model) whoHas(name string) []string {
}
func fileFromFileInfo(f protocol.FileInfo) File {
var blocks []Block
var blocks = make([]Block, len(f.Blocks))
var offset uint64
for _, b := range f.Blocks {
blocks = append(blocks, Block{
for i, b := range f.Blocks {
blocks[i] = Block{
Offset: offset,
Length: b.Length,
Hash: b.Hash,
})
}
offset += uint64(b.Length)
}
return File{
@@ -636,12 +646,12 @@ func fileFromFileInfo(f protocol.FileInfo) File {
}
func fileInfoFromFile(f File) protocol.FileInfo {
var blocks []protocol.BlockInfo
for _, b := range f.Blocks {
blocks = append(blocks, protocol.BlockInfo{
var blocks = make([]protocol.BlockInfo, len(f.Blocks))
for i, b := range f.Blocks {
blocks[i] = protocol.BlockInfo{
Length: b.Length,
Hash: b.Hash,
})
}
}
return protocol.FileInfo{
Name: f.Name,

View File

@@ -2,6 +2,7 @@ package model
import (
"bytes"
"fmt"
"os"
"reflect"
"testing"
@@ -39,12 +40,6 @@ var testDataExpected = map[string]File{
Modified: 0,
Blocks: []Block{{Offset: 0x0, Length: 0xa, Hash: []uint8{0x2f, 0x72, 0xcc, 0x11, 0xa6, 0xfc, 0xd0, 0x27, 0x1e, 0xce, 0xf8, 0xc6, 0x10, 0x56, 0xee, 0x1e, 0xb1, 0x24, 0x3b, 0xe3, 0x80, 0x5b, 0xf9, 0xa9, 0xdf, 0x98, 0xf9, 0x2f, 0x76, 0x36, 0xb0, 0x5c}}},
},
"baz/quux": File{
Name: "baz/quux",
Flags: 0,
Modified: 0,
Blocks: []Block{{Offset: 0x0, Length: 0x9, Hash: []uint8{0xc1, 0x54, 0xd9, 0x4e, 0x94, 0xba, 0x72, 0x98, 0xa6, 0xad, 0xb0, 0x52, 0x3a, 0xfe, 0x34, 0xd1, 0xb6, 0xa5, 0x81, 0xd6, 0xb8, 0x93, 0xa7, 0x63, 0xd4, 0x5d, 0xdc, 0x5e, 0x20, 0x9d, 0xcb, 0x83}}},
},
}
func init() {
@@ -406,3 +401,113 @@ func TestIgnoreWithUnknownFlags(t *testing.T) {
t.Error("Model not should include", invalid)
}
}
func prepareModel(n int, m *Model) []protocol.FileInfo {
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := make([]protocol.FileInfo, n)
t := time.Now().Unix()
for i := 0; i < n; i++ {
files[i] = protocol.FileInfo{
Name: fmt.Sprintf("file%d", i),
Modified: t,
Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}},
}
}
m.Index("42", files)
return files
}
func BenchmarkRecomputeGlobal10k(b *testing.B) {
m := NewModel("testdata")
prepareModel(10000, m)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.recomputeGlobal()
}
}
func BenchmarkRecomputeNeed10K(b *testing.B) {
m := NewModel("testdata")
prepareModel(10000, m)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.recomputeNeed()
}
}
func BenchmarkIndexUpdate10000(b *testing.B) {
m := NewModel("testdata")
files := prepareModel(10000, m)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate("42", files)
}
}
type FakeConnection struct {
id string
requestData []byte
}
func (FakeConnection) Close() error {
return nil
}
func (f FakeConnection) ID() string {
return string(f.id)
}
func (FakeConnection) Index([]protocol.FileInfo) {}
func (f FakeConnection) Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
return f.requestData, nil
}
func (FakeConnection) Ping() bool {
return true
}
func (FakeConnection) Statistics() protocol.Statistics {
return protocol.Statistics{}
}
func BenchmarkRequest(b *testing.B) {
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
const n = 1000
files := make([]protocol.FileInfo, n)
t := time.Now().Unix()
for i := 0; i < n; i++ {
files[i] = protocol.FileInfo{
Name: fmt.Sprintf("file%d", i),
Modified: t,
Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}},
}
}
fc := FakeConnection{
id: "42",
requestData: []byte("some data to return"),
}
m.AddConnection(fc, fc)
m.Index("42", files)
b.ResetTimer()
for i := 0; i < b.N; i++ {
data, err := m.requestGlobal("42", files[i%n].Name, 0, 32, nil)
if err != nil {
b.Error(err)
}
if data == nil {
b.Error("nil data")
}
}
}

View File

@@ -49,16 +49,12 @@ func tempName(name string, modified int64) string {
return path.Join(tdir, tname)
}
func (m *Model) genWalker(res *[]File, ign map[string][]string) filepath.WalkFunc {
func (m *Model) loadIgnoreFiles(ign map[string][]string) filepath.WalkFunc {
return func(p string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if isTempName(p) {
return nil
}
rn, err := filepath.Rel(m.dir, p)
if err != nil {
return nil
@@ -75,6 +71,36 @@ func (m *Model) genWalker(res *[]File, ign map[string][]string) filepath.WalkFun
}
}
ign[pn] = patterns
}
return nil
}
}
func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.WalkFunc {
return func(p string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if isTempName(p) {
return nil
}
rn, err := filepath.Rel(m.dir, p)
if err != nil {
return nil
}
if _, sn := path.Split(rn); sn == ".stignore" {
// We never sync the .stignore files
return nil
}
if ignoreFile(ign, rn) {
if m.trace["file"] {
log.Println("FILE: IGNORE:", rn)
}
return nil
}
@@ -149,8 +175,11 @@ func (m *Model) genWalker(res *[]File, ign map[string][]string) filepath.WalkFun
// file system. Files are blockwise hashed.
func (m *Model) Walk(followSymlinks bool) (files []File, ignore map[string][]string) {
ignore = make(map[string][]string)
fn := m.genWalker(&files, ignore)
filepath.Walk(m.dir, fn)
hashFiles := m.walkAndHashFiles(&files, ignore)
filepath.Walk(m.dir, m.loadIgnoreFiles(ignore))
filepath.Walk(m.dir, hashFiles)
if followSymlinks {
d, err := os.Open(m.dir)
@@ -166,7 +195,9 @@ func (m *Model) Walk(followSymlinks bool) (files []File, ignore map[string][]str
for _, fi := range fis {
if fi.Mode()&os.ModeSymlink != 0 {
filepath.Walk(path.Join(m.dir, fi.Name())+"/", fn)
dir := path.Join(m.dir, fi.Name()) + "/"
filepath.Walk(dir, m.loadIgnoreFiles(ignore))
filepath.Walk(dir, hashFiles)
}
}
}
@@ -174,14 +205,6 @@ func (m *Model) Walk(followSymlinks bool) (files []File, ignore map[string][]str
return
}
// Walk returns the list of files found in the local repository by scanning the
// file system. Files are blockwise hashed. Patterns marked in .stignore files
// are removed from the results.
func (m *Model) FilteredWalk(followSymlinks bool) []File {
var files, ignored = m.Walk(followSymlinks)
return ignoreFilter(ignored, files)
}
func (m *Model) cleanTempFile(path string, info os.FileInfo, err error) error {
if err != nil {
return err
@@ -200,19 +223,24 @@ func (m *Model) cleanTempFiles() {
}
func ignoreFilter(patterns map[string][]string, files []File) (filtered []File) {
nextFile:
for _, f := range files {
first, last := path.Split(f.Name)
for prefix, pats := range patterns {
if len(prefix) == 0 || prefix == first || strings.HasPrefix(first, prefix+"/") {
for _, pattern := range pats {
if match, _ := path.Match(pattern, last); match {
continue nextFile
}
}
}
if !ignoreFile(patterns, f.Name) {
filtered = append(filtered, f)
}
filtered = append(filtered, f)
}
return filtered
}
func ignoreFile(patterns map[string][]string, file string) bool {
first, last := path.Split(file)
for prefix, pats := range patterns {
if len(prefix) == 0 || prefix == first || strings.HasPrefix(first, prefix+"/") {
for _, pattern := range pats {
if match, _ := path.Match(pattern, last); match {
return true
}
}
}
}
return false
}

View File

@@ -13,7 +13,6 @@ var testdata = []struct {
hash string
}{
{"bar", 10, "2f72cc11a6fcd0271ecef8c61056ee1eb1243be3805bf9a9df98f92f7636b05c"},
{"baz/quux", 9, "c154d94e94ba7298a6adb0523afe34d1b6a581d6b893a763d45ddc5e209dcb83"},
{"foo", 7, "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f"},
}
@@ -50,21 +49,6 @@ func TestWalk(t *testing.T) {
}
}
func TestFilteredWalk(t *testing.T) {
m := NewModel("testdata")
files := m.FilteredWalk(false)
if len(files) != 2 {
t.Fatalf("Incorrect number of walked filtered files %d != 2", len(files))
}
if files[0].Name != "bar" {
t.Error("Incorrect first file", files[0])
}
if files[1].Name != "foo" {
t.Error("Incorrect second file", files[1])
}
}
func TestIgnore(t *testing.T) {
var patterns = map[string][]string{
"": {"t2"},

View File

@@ -52,7 +52,7 @@ type Model interface {
type Connection struct {
sync.RWMutex
ID string
id string
receiver Model
reader io.Reader
mreader *marshalReader
@@ -89,13 +89,13 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
}
c := Connection{
id: nodeID,
receiver: receiver,
reader: flrd,
mreader: &marshalReader{r: flrd},
writer: flwr,
mwriter: &marshalWriter{w: flwr},
awaiting: make(map[int]chan asyncResult),
ID: nodeID,
}
go c.readerLoop()
@@ -104,6 +104,10 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
return &c
}
func (c *Connection) ID() string {
return c.id
}
// Index writes the list of file information to the connected peer node
func (c *Connection) Index(idx []FileInfo) {
c.Lock()
@@ -137,10 +141,10 @@ func (c *Connection) Index(idx []FileInfo) {
c.Unlock()
if err != nil {
c.Close(err)
c.close(err)
return
} else if c.mwriter.err != nil {
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
return
}
}
@@ -158,13 +162,13 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt
c.mwriter.writeRequest(request{name, offset, size, hash})
if c.mwriter.err != nil {
c.Unlock()
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
return nil, c.mwriter.err
}
err := c.flush()
if err != nil {
c.Unlock()
c.Close(err)
c.close(err)
return nil, err
}
c.nextId = (c.nextId + 1) & 0xfff
@@ -177,7 +181,7 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt
return res.val, res.err
}
func (c *Connection) Ping() bool {
func (c *Connection) ping() bool {
c.Lock()
if c.closed {
c.Unlock()
@@ -189,11 +193,11 @@ func (c *Connection) Ping() bool {
err := c.flush()
if err != nil {
c.Unlock()
c.Close(err)
c.close(err)
return false
} else if c.mwriter.err != nil {
c.Unlock()
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
return false
}
c.nextId = (c.nextId + 1) & 0xfff
@@ -203,9 +207,6 @@ func (c *Connection) Ping() bool {
return ok && res.err == nil
}
func (c *Connection) Stop() {
}
type flusher interface {
Flush() error
}
@@ -217,7 +218,7 @@ func (c *Connection) flush() error {
return nil
}
func (c *Connection) Close(err error) {
func (c *Connection) close(err error) {
c.Lock()
if c.closed {
c.Unlock()
@@ -230,7 +231,7 @@ func (c *Connection) Close(err error) {
c.awaiting = nil
c.Unlock()
c.receiver.Close(c.ID, err)
c.receiver.Close(c.id, err)
}
func (c *Connection) isClosed() bool {
@@ -244,11 +245,11 @@ loop:
for {
hdr := c.mreader.readHeader()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
}
if hdr.version != 0 {
c.Close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version))
c.close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version))
break loop
}
@@ -256,10 +257,10 @@ loop:
case messageTypeIndex:
files := c.mreader.readIndex()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
} else {
c.receiver.Index(c.ID, files)
c.receiver.Index(c.id, files)
}
c.Lock()
c.hasRecvdIndex = true
@@ -268,16 +269,16 @@ loop:
case messageTypeIndexUpdate:
files := c.mreader.readIndex()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
} else {
c.receiver.IndexUpdate(c.ID, files)
c.receiver.IndexUpdate(c.id, files)
}
case messageTypeRequest:
req := c.mreader.readRequest()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
}
go c.processRequest(hdr.msgID, req)
@@ -286,7 +287,7 @@ loop:
data := c.mreader.readResponse()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
} else {
c.Lock()
@@ -306,10 +307,10 @@ loop:
err := c.flush()
c.Unlock()
if err != nil {
c.Close(err)
c.close(err)
break loop
} else if c.mwriter.err != nil {
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
break loop
}
@@ -328,14 +329,14 @@ loop:
}
default:
c.Close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
break loop
}
}
}
func (c *Connection) processRequest(msgID int, req request) {
data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash)
data, _ := c.receiver.Request(c.id, req.name, req.offset, req.size, req.hash)
c.Lock()
c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
@@ -345,9 +346,9 @@ func (c *Connection) processRequest(msgID int, req request) {
buffers.Put(data)
if err != nil {
c.Close(err)
c.close(err)
} else if c.mwriter.err != nil {
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
}
}
@@ -362,15 +363,15 @@ func (c *Connection) pingerLoop() {
if ready {
go func() {
rc <- c.Ping()
rc <- c.ping()
}()
select {
case ok := <-rc:
if !ok {
c.Close(fmt.Errorf("Ping failure"))
c.close(fmt.Errorf("Ping failure"))
}
case <-time.After(pingTimeout):
c.Close(fmt.Errorf("Ping timeout"))
c.close(fmt.Errorf("Ping timeout"))
}
}
}

View File

@@ -46,10 +46,10 @@ func TestPing(t *testing.T) {
c0 := NewConnection("c0", ar, bw, nil)
c1 := NewConnection("c1", br, aw, nil)
if ok := c0.Ping(); !ok {
if ok := c0.ping(); !ok {
t.Error("c0 ping failed")
}
if ok := c1.Ping(); !ok {
if ok := c1.ping(); !ok {
t.Error("c1 ping failed")
}
}
@@ -70,7 +70,7 @@ func TestPingErr(t *testing.T) {
c0 := NewConnection("c0", ar, ebw, m0)
NewConnection("c1", br, eaw, m1)
res := c0.Ping()
res := c0.ping()
if (i < 4 || j < 4) && res {
t.Errorf("Unexpected ping success; i=%d, j=%d", i, j)
} else if (i >= 8 && j >= 8) && !res {
@@ -190,7 +190,7 @@ func TestClose(t *testing.T) {
c0 := NewConnection("c0", ar, bw, m0)
NewConnection("c1", br, aw, m1)
c0.Close(nil)
c0.close(nil)
ok := c0.isClosed()
if !ok {
@@ -199,7 +199,7 @@ func TestClose(t *testing.T) {
// None of these should panic, some should return an error
ok = c0.Ping()
ok = c0.ping()
if ok {
t.Error("Ping should not return true")
}