Compare commits

...

19 Commits

Author SHA1 Message Date
Jakob Borg
69ef4d261d Unbreak build script 2014-01-07 17:07:46 +01:00
Jakob Borg
91c102e4fe Syncronize file mode (fixes #20) 2014-01-07 16:38:07 +01:00
Jakob Borg
b4db177045 Allow deletes per default (fixes #19) 2014-01-07 16:15:18 +01:00
Jakob Borg
340c9095dd Suppress frequent changes to files (fixes #12) 2014-01-07 16:10:38 +01:00
Jakob Borg
e3bc33dc88 Move binary to build destination 2014-01-07 12:14:50 +01:00
Jakob Borg
eebc145055 Point to the wiki for documentation (fixes #10) 2014-01-07 12:07:56 +01:00
Jakob Borg
92b01fa48a Build tar file for current OS/architecture 2014-01-07 11:52:42 +01:00
Jakob Borg
2a0d1ab294 Merge pull request #18 from jpjp/patch-1
synch -> sync
2014-01-07 02:33:55 -08:00
jpjp
2bdab426ff synch -> sync 2014-01-06 22:27:57 +01:00
Jakob Borg
e769de9986 Announce read/write/delete status on startup (fixes #14) 2014-01-06 21:41:29 +01:00
Jakob Borg
4b11e66914 Verify requests against model (fixes #15) 2014-01-06 21:31:36 +01:00
Jakob Borg
28d3936a3c Add .stignore to testdata 2014-01-06 21:31:20 +01:00
Jakob Borg
986b15573a Ignore files matching patterns in .stignore (fixes #7) 2014-01-06 21:17:18 +01:00
Jakob Borg
46d828e349 Expand tilde in repository path (fixes #8) 2014-01-06 19:37:26 +01:00
Jakob Borg
48603a1619 Do quick build even on test failure 2014-01-06 15:50:15 +01:00
Jakob Borg
17d5f2bbfc Fix model test 2014-01-06 15:49:51 +01:00
Jakob Borg
b64af73607 Dead code removal 2014-01-06 12:45:43 +01:00
Jakob Borg
c9cce9613e Refactor out the model into a subpackage 2014-01-06 11:11:18 +01:00
Jakob Borg
1392905d63 Link to issues 2014-01-06 08:19:33 +01:00
19 changed files with 714 additions and 604 deletions

160
README.md
View File

@@ -25,163 +25,11 @@ making sure large swarms of selfish agents behave and somehow work
towards a common goal. Here we have a much smaller swarm of cooperative
agents and a simpler approach will suffice.
Features
--------
Documentation
=============
The following features are _currently implemented and working_:
* The formation of a cluster of nodes, certificate authenticated and
communicating over TLS over TCP.
* Synchronization of a single directory among the cluster nodes.
* Change detection by periodic scanning of the local repository.
* Static configuration of cluster nodes.
* Automatic discovery of cluster nodes. See [discover.go][discover.go]
for the protocol specification. Discovery on the LAN is performed by
broadcasts, Internet wide discovery is performed with the assistance
of a global server.
* Handling of deleted files. Deletes can be propagated or ignored per
client.
* Synchronizing multiple unrelated directory trees by following
symlinks directly below the repository level.
* HTTP GUI.
The following features are _not yet implemented but planned_:
* Change detection by listening to file system notifications instead of
periodic scanning.
The following features are _not implemented but may be implemented_ in
the future:
* Syncing multiple directories from the same syncthing instance.
* Automatic NAT handling via UPNP.
* Conflict resolution. Currently whichever file has the newest
modification time "wins". The correct behavior in the face of
conflicts is open for discussion.
[discover.go]: https://github.com/calmh/syncthing/blob/master/discover/discover.go
Security
--------
Security is one of the primary project goals. This means that it should
not be possible for an attacker to join a cluster uninvited, and it
should not be possible to extract private information from intercepted
traffic. Currently this is implemented as follows.
All traffic is protected by TLS. To prevent uninvited nodes from joining
a cluster, the certificate fingerprint of each node is compared to a
preset list of acceptable nodes at connection establishment. The
fingerprint is computed as the SHA-1 hash of the certificate and
displayed in BASE32 encoding to form a compact yet convenient string.
Currently SHA-1 is deemed secure against preimage attacks.
Installing
==========
Download the appropriate precompiled binary from the
[releases](https://github.com/calmh/syncthing/releases) page. Untar and
put the `syncthing` binary somewhere convenient in your `$PATH`.
If you are a developer and have Go 1.2 installed you can also install
the latest version from source. `go get` works as expected but builds
a binary without GUI capabilities. Use the included `build.sh` script
without parameters to build a syncthing with GUI.
Usage
=====
Check out the options:
```
$ syncthing --help
Usage:
syncthing [options]
...
```
Run syncthing to let it create it's config directory and certificate:
```
$ syncthing
11:34:13 main.go:85: INFO: Version v0.1-40-gbb0fd87
11:34:13 tls.go:61: OK: Created TLS certificate file
11:34:13 tls.go:67: OK: Created TLS key file
11:34:13 main.go:66: INFO: My ID: NCTBZAAHXR6ZZP3D7SL3DLYFFQERMW4Q
11:34:13 main.go:90: FATAL: No config file
```
Take note of the "My ID: ..." line. Perform the same operation on
another computer to create another node. Take note of that ID as well,
and create a config file `~/.syncthing/syncthing.ini` looking something
like this:
```
[repository]
dir = /Users/jb/Synced
[nodes]
NCTBZAAHXR6ZZP3D7SL3DLYFFQERMW4Q = 172.16.32.1:22000 192.23.34.56:22000
CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL = dynamic
```
This assumes that the first node is reachable on either of the two
addresses listed (perhaps one internal and one port-forwarded external)
and that the other node is not normally reachable from the outside. Save
this config file, identically, to both nodes.
If the nodes are running on the same network, or reachable on port 22000
from the outside world, you can set all addresses to "dynamic" and they
will find each other using automatic discovery. (This discovery,
including port numbers, can be tweaked or disabled using command line
options.)
Start syncthing on both nodes. For the cautious, one side can be set to
be read only.
```
$ syncthing --ro
13:30:55 main.go:85: INFO: Version v0.1-40-gbb0fd87
13:30:55 main.go:102: INFO: My ID: NCTBZAAHXR6ZZP3D7SL3DLYFFQERMW4Q
13:30:55 main.go:149: INFO: Initial repository scan in progress
13:30:59 main.go:153: INFO: Listening for incoming connections
13:30:59 main.go:157: INFO: Attempting to connect to other nodes
13:30:59 main.go:247: INFO: Starting local discovery
13:30:59 main.go:165: OK: Ready to synchronize
13:31:04 discover.go:113: INFO: Discovered node CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL at 172.16.32.24:22000
13:31:14 main.go:296: INFO: Connected to node CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL
13:31:19 main.go:345: INFO: Transferred 139 KiB in (14 KiB/s), 139 KiB out (14 KiB/s)
13:32:20 model.go:94: INFO: CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL: 263.4 KB/s in, 69.1 KB/s out
13:32:20 model.go:104: INFO: 18289 files, 24.24 GB in cluster
13:32:20 model.go:111: INFO: 17132 files, 22.39 GB in local repo
13:32:20 model.go:117: INFO: 1157 files, 1.84 GB to synchronize
...
```
You should see the synchronization start and then finish a short while
later. Add nodes to taste.
GUI
---
The web based GUI is disabled per default. To enable and access it you
must start syncthing with the `--gui` command line option, giving a
listen address. For example:
```
$ syncthing --gui 127.0.0.1:8080
```
You then point your browser to the given address.
The syncthing documentation is kept on the
[GitHub Wiki](https://github.com/calmh/syncthing/wiki).
License
=======

View File

@@ -4,8 +4,17 @@ version=$(git describe --always)
buildDir=dist
if [[ -z $1 ]] ; then
go test ./...
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing gui
elif [[ $1 == "tar" ]] ; then
go test ./...
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing gui \
&& mkdir syncthing-dist \
&& cp syncthing README.md LICENSE syncthing-dist \
&& tar zcvf syncthing-dist.tar.gz syncthing-dist \
&& rm -rf syncthing-dist
else
go test ./... || exit 1
@@ -38,7 +47,7 @@ else
go build -ldflags "-X main.Version $version" \
&& nrsc syncthing.exe gui \
&& mkdir -p "$name" \
&& cp syncthing.exe "$buildDir/$name.exe" \
&& mv syncthing.exe "$buildDir/$name.exe" \
&& cp README.md LICENSE "$name" \
&& zip -qr "$buildDir/$name.zip" "$name" \
&& rm -r "$name"

32
gui.go
View File

@@ -8,12 +8,13 @@ import (
"mime"
"net/http"
"path/filepath"
"bitbucket.org/tebeka/nrsc"
"bitbucket.org/tebeka/nrsc"
"github.com/calmh/syncthing/model"
"github.com/codegangsta/martini"
)
func startGUI(addr string, m *Model) {
func startGUI(addr string, m *model.Model) {
router := martini.NewRouter()
router.Get("/", getRoot)
router.Get("/rest/version", restGetVersion)
@@ -40,7 +41,7 @@ func restGetVersion() string {
return Version
}
func restGetModel(m *Model, w http.ResponseWriter) {
func restGetModel(m *model.Model, w http.ResponseWriter) {
var res = make(map[string]interface{})
globalFiles, globalDeleted, globalBytes := m.GlobalSize()
@@ -59,7 +60,7 @@ func restGetModel(m *Model, w http.ResponseWriter) {
json.NewEncoder(w).Encode(res)
}
func restGetConnections(m *Model, w http.ResponseWriter) {
func restGetConnections(m *model.Model, w http.ResponseWriter) {
var res = m.ConnectionStats()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(res)
@@ -73,14 +74,27 @@ func restGetConfig(w http.ResponseWriter) {
json.NewEncoder(w).Encode(res)
}
func restGetNeed(m *Model, w http.ResponseWriter) {
type guiFile model.File
func (f guiFile) MarshalJSON() ([]byte, error) {
type t struct {
Name string
Size int
}
return json.Marshal(t{
Name: f.Name,
Size: model.File(f).Size(),
})
}
func restGetNeed(m *model.Model, w http.ResponseWriter) {
files, _ := m.NeedFiles()
if files == nil {
// We don't want the empty list to serialize as "null\n"
files = make([]FileInfo, 0)
gfs := make([]guiFile, len(files))
for i, f := range files {
gfs[i] = guiFile(f)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(files)
json.NewEncoder(w).Encode(gfs)
}
func nrscStatic(path string) interface{} {

81
main.go
View File

@@ -2,9 +2,7 @@ package main
import (
"compress/gzip"
"crypto/sha1"
"crypto/tls"
"fmt"
"log"
"net"
"net/http"
@@ -18,6 +16,7 @@ import (
"github.com/calmh/ini"
"github.com/calmh/syncthing/discover"
flags "github.com/calmh/syncthing/github.com/jessevdk/go-flags"
"github.com/calmh/syncthing/model"
"github.com/calmh/syncthing/protocol"
)
@@ -25,8 +24,8 @@ type Options struct {
ConfDir string `short:"c" long:"cfg" description:"Configuration directory" default:"~/.syncthing" value-name:"DIR"`
Listen string `short:"l" long:"listen" description:"Listen address" default:":22000" value-name:"ADDR"`
ReadOnly bool `short:"r" long:"ro" description:"Repository is read only"`
Delete bool `short:"d" long:"delete" description:"Delete files deleted from cluster"`
Rehash bool `long:"rehash" description:"Ignore cache and rehash all files in repository"`
NoDelete bool `long:"no-delete" description:"Never delete files"`
NoSymlinks bool `long:"no-symlinks" description:"Don't follow first level symlinks in the repo"`
NoStats bool `long:"no-stats" description:"Don't print model and connection statistics"`
GUIAddr string `long:"gui" description:"GUI listen address" default:"" value-name:"ADDR"`
@@ -36,12 +35,10 @@ type Options struct {
}
type DebugOptions struct {
LogSource bool `long:"log-source"`
TraceFile bool `long:"trace-file"`
TraceNet bool `long:"trace-net"`
TraceIdx bool `long:"trace-idx"`
TraceNeed bool `long:"trace-need"`
Profiler string `long:"profiler" value-name:"ADDR"`
LogSource bool `long:"log-source"`
TraceModel []string `long:"trace-model" value-name:"TRACE" description:"idx, net, file, need"`
TraceConnect bool `long:"trace-connect"`
Profiler string `long:"profiler" value-name:"ADDR"`
}
type DiscoveryOptions struct {
@@ -62,7 +59,6 @@ var opts Options
var Version string = "unknown-dev"
const (
confDirName = ".syncthing"
confFileName = "syncthing.ini"
)
@@ -76,12 +72,10 @@ func main() {
if err != nil {
os.Exit(0)
}
if opts.Debug.TraceFile || opts.Debug.TraceIdx || opts.Debug.TraceNet || opts.Debug.LogSource {
if len(opts.Debug.TraceModel) > 0 || opts.Debug.LogSource {
logger = log.New(os.Stderr, "", log.Lshortfile|log.Ldate|log.Ltime|log.Lmicroseconds)
}
if strings.HasPrefix(opts.ConfDir, "~/") {
opts.ConfDir = strings.Replace(opts.ConfDir, "~", getHomeDir(), 1)
}
opts.ConfDir = expandTilde(opts.ConfDir)
infoln("Version", Version)
@@ -128,7 +122,7 @@ func main() {
config = ini.Parse(cf)
cf.Close()
var dir = config.Get("repository", "dir")
var dir = expandTilde(config.Get("repository", "dir"))
// Create a map of desired node connections based on the configuration file
// directives.
@@ -139,7 +133,10 @@ func main() {
}
ensureDir(dir, -1)
m := NewModel(dir)
m := model.NewModel(dir)
for _, t := range opts.Debug.TraceModel {
m.Trace(t)
}
// GUI
if opts.GUIAddr != "" {
@@ -167,10 +164,15 @@ func main() {
// Routine to pull blocks from other nodes to synchronize the local
// repository. Does not run when we are in read only (publish only) mode.
if !opts.ReadOnly {
infoln("Cleaning out incomplete synchronizations")
CleanTempFiles(dir)
okln("Ready to synchronize")
m.Start()
if opts.NoDelete {
infoln("Deletes from peer nodes will be ignored")
} else {
infoln("Deletes from peer nodes are allowed")
}
okln("Ready to synchronize (read-write)")
m.StartRW(!opts.NoDelete, opts.Advanced.FilesInFlight, opts.Advanced.RequestsInFlight)
} else {
okln("Ready to synchronize (read only; no external updates accepted)")
}
// Periodically scan the repository and update the local model.
@@ -190,9 +192,9 @@ func main() {
select {}
}
func printStatsLoop(m *Model) {
func printStatsLoop(m *model.Model) {
var lastUpdated int64
var lastStats = make(map[string]ConnectionInfo)
var lastStats = make(map[string]model.ConnectionInfo)
for {
time.Sleep(60 * time.Second)
@@ -216,12 +218,12 @@ func printStatsLoop(m *Model) {
files, _, bytes = m.LocalSize()
infof("%6d files, %9sB in local repo", files, BinaryPrefix(bytes))
needFiles, bytes := m.NeedFiles()
infof("%6d files, %9sB in to synchronize", len(needFiles), BinaryPrefix(bytes))
infof("%6d files, %9sB to synchronize", len(needFiles), BinaryPrefix(bytes))
}
}
}
func listen(myID string, addr string, m *Model, cfg *tls.Config) {
func listen(myID string, addr string, m *model.Model, cfg *tls.Config) {
l, err := tls.Listen("tcp", addr, cfg)
fatalErr(err)
@@ -233,7 +235,7 @@ listen:
continue
}
if opts.Debug.TraceNet {
if opts.Debug.TraceConnect {
debugln("NET: Connect from", conn.RemoteAddr())
}
@@ -267,7 +269,7 @@ listen:
}
}
func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, cfg *tls.Config) {
func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.Model, cfg *tls.Config) {
_, portstr, err := net.SplitHostPort(addr)
fatalErr(err)
port, _ := strconv.Atoi(portstr)
@@ -310,12 +312,12 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model,
}
}
if opts.Debug.TraceNet {
if opts.Debug.TraceConnect {
debugln("NET: Dial", nodeID, addr)
}
conn, err := tls.Dial("tcp", addr, cfg)
if err != nil {
if opts.Debug.TraceNet {
if opts.Debug.TraceConnect {
debugln("NET:", err)
}
continue
@@ -337,14 +339,14 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model,
}
}
func updateLocalModel(m *Model) {
files := Walk(m.Dir(), m, !opts.NoSymlinks)
func updateLocalModel(m *model.Model) {
files := m.FilteredWalk(!opts.NoSymlinks)
m.ReplaceLocal(files)
saveIndex(m)
}
func saveIndex(m *Model) {
name := fmt.Sprintf("%x.idx.gz", sha1.Sum([]byte(m.Dir())))
func saveIndex(m *model.Model) {
name := m.RepoID() + ".idx.gz"
fullName := path.Join(opts.ConfDir, name)
idxf, err := os.Create(fullName + ".tmp")
if err != nil {
@@ -359,9 +361,9 @@ func saveIndex(m *Model) {
os.Rename(fullName+".tmp", fullName)
}
func loadIndex(m *Model) {
fname := fmt.Sprintf("%x.idx.gz", sha1.Sum([]byte(m.Dir())))
idxf, err := os.Open(path.Join(opts.ConfDir, fname))
func loadIndex(m *model.Model) {
name := m.RepoID() + ".idx.gz"
idxf, err := os.Open(path.Join(opts.ConfDir, name))
if err != nil {
return
}
@@ -377,7 +379,7 @@ func loadIndex(m *Model) {
if err != nil {
return
}
m.SeedIndex(idx)
m.SeedLocal(idx)
}
func ensureDir(dir string, mode int) {
@@ -391,6 +393,13 @@ func ensureDir(dir string, mode int) {
}
}
func expandTilde(p string) string {
if strings.HasPrefix(p, "~/") {
return strings.Replace(p, "~", getHomeDir(), 1)
}
return p
}
func getHomeDir() string {
home := os.Getenv("HOME")
if home == "" {

View File

@@ -1,4 +1,4 @@
package main
package model
import (
"bytes"
@@ -6,8 +6,6 @@ import (
"io"
)
type BlockList []Block
type Block struct {
Offset uint64
Length uint32
@@ -15,8 +13,8 @@ type Block struct {
}
// Blocks returns the blockwise hash of the reader.
func Blocks(r io.Reader, blocksize int) (BlockList, error) {
var blocks BlockList
func Blocks(r io.Reader, blocksize int) ([]Block, error) {
var blocks []Block
var offset uint64
for {
lr := &io.LimitedReader{r, int64(blocksize)}
@@ -42,9 +40,9 @@ func Blocks(r io.Reader, blocksize int) (BlockList, error) {
return blocks, nil
}
// To returns the list of blocks necessary to transform src into dst.
// Both block lists must have been created with the same block size.
func (src BlockList) To(tgt BlockList) (have, need BlockList) {
// BlockDiff returns lists of common and missing (to transform src into tgt)
// blocks. Both block lists must have been created with the same block size.
func BlockDiff(src, tgt []Block) (have, need []Block) {
if len(tgt) == 0 && len(src) != 0 {
return nil, nil
}

View File

@@ -1,4 +1,4 @@
package main
package model
import (
"bytes"
@@ -98,7 +98,7 @@ func TestDiff(t *testing.T) {
for i, test := range diffTestData {
a, _ := Blocks(bytes.NewBufferString(test.a), test.s)
b, _ := Blocks(bytes.NewBufferString(test.b), test.s)
_, d := a.To(b)
_, d := BlockDiff(a, b)
if len(d) != len(test.d) {
t.Fatalf("Incorrect length for diff %d; %d != %d", i, len(d), len(test.d))
} else {

View File

@@ -1,4 +1,4 @@
package main
package model
/*
@@ -12,8 +12,11 @@ acquire locks, but document what locks they require.
*/
import (
"crypto/sha1"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"path"
@@ -40,6 +43,16 @@ type Model struct {
lastIdxBcast time.Time
lastIdxBcastRequest time.Time
rwRunning bool
parallellFiles int
paralllelReqs int
delete bool
trace map[string]bool
fileLastChanged map[string]time.Time
fileWasSuppressed map[string]int
}
const (
@@ -47,28 +60,64 @@ const (
idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
minFileHoldTimeS = 60 // Never allow file changes more often than this
maxFileHoldTimeS = 600 // Always allow file changes at least this often
)
var ErrNoSuchFile = errors.New("no such file")
// NewModel creates and starts a new model. The model starts in read-only mode,
// where it sends index information to connected peers and responds to requests
// for file data without altering the local repository in any way.
func NewModel(dir string) *Model {
m := &Model{
dir: dir,
global: make(map[string]File),
local: make(map[string]File),
remote: make(map[string]map[string]File),
need: make(map[string]bool),
nodes: make(map[string]*protocol.Connection),
rawConn: make(map[string]io.ReadWriteCloser),
lastIdxBcast: time.Now(),
dir: dir,
global: make(map[string]File),
local: make(map[string]File),
remote: make(map[string]map[string]File),
need: make(map[string]bool),
nodes: make(map[string]*protocol.Connection),
rawConn: make(map[string]io.ReadWriteCloser),
lastIdxBcast: time.Now(),
trace: make(map[string]bool),
fileLastChanged: make(map[string]time.Time),
fileWasSuppressed: make(map[string]int),
}
go m.broadcastIndexLoop()
return m
}
func (m *Model) Start() {
// Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
func (m *Model) Trace(t string) {
m.Lock()
defer m.Unlock()
m.trace[t] = true
}
// StartRW starts read/write processing on the current model. When in
// read/write mode the model will attempt to keep in sync with the cluster by
// pulling needed files from peer nodes.
func (m *Model) StartRW(del bool, pfiles, preqs int) {
m.Lock()
defer m.Unlock()
if m.rwRunning {
panic("starting started model")
}
m.rwRunning = true
m.delete = del
m.parallellFiles = pfiles
m.paralllelReqs = preqs
go m.cleanTempFiles()
go m.puller()
}
// Generation returns an opaque integer that is guaranteed to increment on
// every change to the local repository or global model.
func (m *Model) Generation() int64 {
m.RLock()
defer m.RUnlock()
@@ -81,6 +130,7 @@ type ConnectionInfo struct {
Address string
}
// ConnectionStats returns a map with connection statistics for each connected node.
func (m *Model) ConnectionStats() map[string]ConnectionInfo {
type remoteAddrer interface {
RemoteAddr() net.Addr
@@ -102,6 +152,8 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
return res
}
// LocalSize returns the number of files, deleted files and total bytes for all
// files in the global model.
func (m *Model) GlobalSize() (files, deleted, bytes int) {
m.RLock()
defer m.RUnlock()
@@ -117,6 +169,8 @@ func (m *Model) GlobalSize() (files, deleted, bytes int) {
return
}
// LocalSize returns the number of files, deleted files and total bytes for all
// files in the local repository.
func (m *Model) LocalSize() (files, deleted, bytes int) {
m.RLock()
defer m.RUnlock()
@@ -132,6 +186,8 @@ func (m *Model) LocalSize() (files, deleted, bytes int) {
return
}
// InSyncSize returns the number and total byte size of the local files that
// are in sync with the global model.
func (m *Model) InSyncSize() (files, bytes int) {
m.RLock()
defer m.RUnlock()
@@ -145,31 +201,27 @@ func (m *Model) InSyncSize() (files, bytes int) {
return
}
type FileInfo struct {
Name string
Size int
}
func (m *Model) NeedFiles() (files []FileInfo, bytes int) {
// NeedFiles returns the list of currently needed files and the total size.
func (m *Model) NeedFiles() (files []File, bytes int) {
m.RLock()
defer m.RUnlock()
for n := range m.need {
f := m.global[n]
s := f.Size()
files = append(files, FileInfo{f.Name, s})
bytes += s
files = append(files, f)
bytes += f.Size()
}
return
}
// Index is called when a new node is connected and we receive their full index.
// Implements the protocol.Model interface.
func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
if opts.Debug.TraceNet {
debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
if m.trace["net"] {
log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs))
}
m.remote[nodeID] = make(map[string]File)
@@ -182,12 +234,13 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
}
// IndexUpdate is called for incremental updates to connected nodes' indexes.
// Implements the protocol.Model interface.
func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
if opts.Debug.TraceNet {
debugf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
if m.trace["net"] {
log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
}
repo, ok := m.remote[nodeID]
@@ -196,7 +249,7 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
}
for _, f := range fs {
if f.Flags&FlagDeleted != 0 && !opts.Delete {
if f.Flags&FlagDeleted != 0 && !m.delete {
// Files marked as deleted do not even enter the model
continue
}
@@ -207,20 +260,8 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
m.recomputeNeed()
}
// SeedIndex is called when our previously cached index is loaded from disk at startup.
func (m *Model) SeedIndex(fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
m.local = make(map[string]File)
for _, f := range fs {
m.local[f.Name] = fileFromFileInfo(f)
}
m.recomputeGlobal()
m.recomputeNeed()
}
// Close removes the peer from the model and closes the underlyign connection if possible.
// Implements the protocol.Model interface.
func (m *Model) Close(node string, err error) {
m.Lock()
defer m.Unlock()
@@ -228,14 +269,6 @@ func (m *Model) Close(node string, err error) {
conn, ok := m.rawConn[node]
if ok {
conn.Close()
} else {
warnln("Close on unknown connection for node", node)
}
if err != nil {
warnf("Disconnected from node %s: %v", node, err)
} else {
infoln("Disconnected from node", node)
}
delete(m.remote, node)
@@ -246,9 +279,21 @@ func (m *Model) Close(node string, err error) {
m.recomputeNeed()
}
// Request returns the specified data segment by reading it from local disk.
// Implements the protocol.Model interface.
func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
if opts.Debug.TraceNet && nodeID != "<local>" {
debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
// Verify that the requested file exists in the local and global model.
m.RLock()
_, localOk := m.local[name]
_, globalOk := m.global[name]
m.RUnlock()
if !localOk || !globalOk {
log.Printf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
return nil, ErrNoSuchFile
}
if m.trace["net"] && nodeID != "<local>" {
log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
}
fn := path.Join(m.dir, name)
fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
@@ -266,21 +311,8 @@ func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []
return buf, nil
}
func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
m.RLock()
nc, ok := m.nodes[nodeID]
m.RUnlock()
if !ok {
return nil, fmt.Errorf("RequestGlobal: no such node: %s", nodeID)
}
if opts.Debug.TraceNet {
debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
}
return nc.Request(name, offset, size, hash)
}
// ReplaceLocal replaces the local repository index with the given list of files.
// Change suppression is applied to files changing too often.
func (m *Model) ReplaceLocal(fs []File) {
m.Lock()
defer m.Unlock()
@@ -312,6 +344,117 @@ func (m *Model) ReplaceLocal(fs []File) {
}
}
// SeedLocal replaces the local repository index with the given list of files,
// in protocol data types. Does not track deletes, should only be used to seed
// the local index from a cache file at startup.
func (m *Model) SeedLocal(fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
m.local = make(map[string]File)
for _, f := range fs {
m.local[f.Name] = fileFromFileInfo(f)
}
m.recomputeGlobal()
m.recomputeNeed()
}
// ConnectedTo returns true if we are connected to the named node.
func (m *Model) ConnectedTo(nodeID string) bool {
m.RLock()
defer m.RUnlock()
_, ok := m.nodes[nodeID]
return ok
}
// ProtocolIndex returns the current local index in protocol data types.
func (m *Model) ProtocolIndex() []protocol.FileInfo {
m.RLock()
defer m.RUnlock()
return m.protocolIndex()
}
// RepoID returns a unique ID representing the current repository location.
func (m *Model) RepoID() string {
return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
}
// AddConnection adds a new peer connection to the model. An initial index will
// be sent to the connected peer, thereafter index updates whenever the local
// repository changes.
func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
node := protocol.NewConnection(nodeID, conn, conn, m)
m.Lock()
m.nodes[nodeID] = node
m.rawConn[nodeID] = conn
m.Unlock()
m.RLock()
idx := m.protocolIndex()
m.RUnlock()
go func() {
node.Index(idx)
}()
}
func (m *Model) shouldSuppressChange(name string) bool {
sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name])
if sup {
m.fileWasSuppressed[name]++
} else {
m.fileWasSuppressed[name] = 0
m.fileLastChanged[name] = time.Now()
}
return sup
}
func shouldSuppressChange(lastChange time.Time, numChanges int) bool {
sinceLast := time.Since(lastChange)
if sinceLast > maxFileHoldTimeS*time.Second {
return false
}
if sinceLast < time.Duration((numChanges+2)*minFileHoldTimeS)*time.Second {
return true
}
return false
}
// protocolIndex returns the current local index in protocol data types.
// Must be called with the read lock held.
func (m *Model) protocolIndex() []protocol.FileInfo {
var index []protocol.FileInfo
for _, f := range m.local {
mf := fileInfoFromFile(f)
if m.trace["idx"] {
var flagComment string
if mf.Flags&FlagDeleted != 0 {
flagComment = " (deleted)"
}
log.Printf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks))
}
index = append(index, mf)
}
return index
}
func (m *Model) requestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
m.RLock()
nc, ok := m.nodes[nodeID]
m.RUnlock()
if !ok {
return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
}
if m.trace["net"] {
log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
}
return nc.Request(name, offset, size, hash)
}
func (m *Model) broadcastIndexLoop() {
for {
m.RLock()
@@ -328,8 +471,8 @@ func (m *Model) broadcastIndexLoop() {
m.lastIdxBcast = time.Now()
for _, node := range m.nodes {
node := node
if opts.Debug.TraceNet {
debugf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
if m.trace["net"] {
log.Printf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
}
go func() {
node.Index(idx)
@@ -367,10 +510,7 @@ func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
return updated
}
func (m *Model) UpdateLocal(f File) {
m.Lock()
defer m.Unlock()
func (m *Model) updateLocal(f File) {
if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified {
m.local[f.Name] = f
m.recomputeGlobal()
@@ -380,36 +520,6 @@ func (m *Model) UpdateLocal(f File) {
}
}
func (m *Model) Dir() string {
m.RLock()
defer m.RUnlock()
return m.dir
}
func (m *Model) HaveFiles() []File {
m.RLock()
defer m.RUnlock()
var files []File
for _, file := range m.local {
files = append(files, file)
}
return files
}
func (m *Model) LocalFile(name string) (File, bool) {
m.RLock()
defer m.RUnlock()
f, ok := m.local[name]
return f, ok
}
func (m *Model) GlobalFile(name string) (File, bool) {
m.RLock()
defer m.RUnlock()
f, ok := m.global[name]
return f, ok
}
// Must be called with the write lock held.
func (m *Model) recomputeGlobal() {
var newGlobal = make(map[string]File)
@@ -452,7 +562,7 @@ func (m *Model) recomputeNeed() {
for n, f := range m.global {
hf, ok := m.local[n]
if !ok || f.Modified > hf.Modified {
if f.Flags&FlagDeleted != 0 && !opts.Delete {
if f.Flags&FlagDeleted != 0 && !m.delete {
// Don't want to delete files, so forget this need
continue
}
@@ -460,8 +570,8 @@ func (m *Model) recomputeNeed() {
// Don't have the file, so don't need to delete it
continue
}
if opts.Debug.TraceNeed {
debugln("NEED:", ok, hf, f)
if m.trace["need"] {
log.Println("NEED:", ok, hf, f)
}
m.need[n] = true
}
@@ -482,56 +592,6 @@ func (m *Model) whoHas(name string) []string {
return remote
}
func (m *Model) ConnectedTo(nodeID string) bool {
m.RLock()
defer m.RUnlock()
_, ok := m.nodes[nodeID]
return ok
}
func (m *Model) ProtocolIndex() []protocol.FileInfo {
m.RLock()
defer m.RUnlock()
return m.protocolIndex()
}
// Must be called with the read lock held.
func (m *Model) protocolIndex() []protocol.FileInfo {
var index []protocol.FileInfo
for _, f := range m.local {
mf := fileInfoFromFile(f)
if opts.Debug.TraceIdx {
var flagComment string
if mf.Flags&FlagDeleted != 0 {
flagComment = " (deleted)"
}
debugf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks))
}
index = append(index, mf)
}
return index
}
func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
node := protocol.NewConnection(nodeID, conn, conn, m)
m.Lock()
m.nodes[nodeID] = node
m.rawConn[nodeID] = conn
m.Unlock()
infoln("Connected to node", nodeID)
m.RLock()
idx := m.protocolIndex()
m.RUnlock()
go func() {
node.Index(idx)
infoln("Sent initial index to node", nodeID)
}()
}
func fileFromFileInfo(f protocol.FileInfo) File {
var blocks []Block
var offset uint64

View File

@@ -1,4 +1,4 @@
package main
package model
/*
@@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"path"
"sync"
@@ -60,7 +61,7 @@ func (m *Model) pullFile(name string) error {
applyDone.Done()
}()
local, remote := localFile.Blocks.To(globalFile.Blocks)
local, remote := BlockDiff(localFile.Blocks, globalFile.Blocks)
var fetchDone sync.WaitGroup
// One local copy routine
@@ -83,7 +84,7 @@ func (m *Model) pullFile(name string) error {
// N remote copy routines
var remoteBlocks = blockIterator{blocks: remote}
for i := 0; i < opts.Advanced.RequestsInFlight; i++ {
for i := 0; i < m.paralllelReqs; i++ {
curNode := nodeIDs[i%len(nodeIDs)]
fetchDone.Add(1)
@@ -93,7 +94,7 @@ func (m *Model) pullFile(name string) error {
if !ok {
break
}
data, err := m.RequestGlobal(nodeID, name, block.Offset, block.Length, block.Hash)
data, err := m.requestGlobal(nodeID, name, block.Offset, block.Length, block.Hash)
if err != nil {
break
}
@@ -120,6 +121,11 @@ func (m *Model) pullFile(name string) error {
return err
}
err = os.Chmod(tmpFilename, os.FileMode(globalFile.Flags&0777))
if err != nil {
return err
}
err = os.Rename(tmpFilename, filename)
if err != nil {
return err
@@ -143,7 +149,7 @@ func (m *Model) puller() {
continue
}
var limiter = make(chan bool, opts.Advanced.FilesInFlight)
var limiter = make(chan bool, m.parallellFiles)
var allDone sync.WaitGroup
for _, n := range ns {
@@ -156,28 +162,31 @@ func (m *Model) puller() {
<-limiter
}()
f, ok := m.GlobalFile(n)
m.RLock()
f, ok := m.global[n]
m.RUnlock()
if !ok {
return
}
var err error
if f.Flags&FlagDeleted == 0 {
if opts.Debug.TraceFile {
debugf("FILE: Pull %q", n)
if m.trace["file"] {
log.Printf("FILE: Pull %q", n)
}
err = m.pullFile(n)
} else {
if opts.Debug.TraceFile {
debugf("FILE: Remove %q", n)
if m.trace["file"] {
log.Printf("FILE: Remove %q", n)
}
// Cheerfully ignore errors here
_ = os.Remove(path.Join(m.dir, n))
}
if err == nil {
m.UpdateLocal(f)
} else {
warnln(err)
m.Lock()
m.updateLocal(f)
m.Unlock()
}
}(n)
}

View File

@@ -1,6 +1,8 @@
package main
package model
import (
"bytes"
"os"
"reflect"
"testing"
"time"
@@ -27,27 +29,37 @@ func TestNewModel(t *testing.T) {
var testDataExpected = map[string]File{
"foo": File{
Name: "foo",
Flags: 0644,
Modified: 1384244572,
Flags: 0,
Modified: 0,
Blocks: []Block{{Offset: 0x0, Length: 0x7, Hash: []uint8{0xae, 0xc0, 0x70, 0x64, 0x5f, 0xe5, 0x3e, 0xe3, 0xb3, 0x76, 0x30, 0x59, 0x37, 0x61, 0x34, 0xf0, 0x58, 0xcc, 0x33, 0x72, 0x47, 0xc9, 0x78, 0xad, 0xd1, 0x78, 0xb6, 0xcc, 0xdf, 0xb0, 0x1, 0x9f}}},
},
"bar": File{
Name: "bar",
Flags: 0644,
Modified: 1384244579,
Flags: 0,
Modified: 0,
Blocks: []Block{{Offset: 0x0, Length: 0xa, Hash: []uint8{0x2f, 0x72, 0xcc, 0x11, 0xa6, 0xfc, 0xd0, 0x27, 0x1e, 0xce, 0xf8, 0xc6, 0x10, 0x56, 0xee, 0x1e, 0xb1, 0x24, 0x3b, 0xe3, 0x80, 0x5b, 0xf9, 0xa9, 0xdf, 0x98, 0xf9, 0x2f, 0x76, 0x36, 0xb0, 0x5c}}},
},
"baz/quux": File{
Name: "baz/quux",
Flags: 0644,
Modified: 1384244676,
Flags: 0,
Modified: 0,
Blocks: []Block{{Offset: 0x0, Length: 0x9, Hash: []uint8{0xc1, 0x54, 0xd9, 0x4e, 0x94, 0xba, 0x72, 0x98, 0xa6, 0xad, 0xb0, 0x52, 0x3a, 0xfe, 0x34, 0xd1, 0xb6, 0xa5, 0x81, 0xd6, 0xb8, 0x93, 0xa7, 0x63, 0xd4, 0x5d, 0xdc, 0x5e, 0x20, 0x9d, 0xcb, 0x83}}},
},
}
func init() {
// Fix expected test data to match reality
for n, f := range testDataExpected {
fi, _ := os.Stat("testdata/" + n)
f.Flags = uint32(fi.Mode())
f.Modified = fi.ModTime().Unix()
testDataExpected[n] = f
}
}
func TestUpdateLocal(t *testing.T) {
m := NewModel("foo")
fs := Walk("testdata", m, false)
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
if len(m.need) > 0 {
@@ -88,8 +100,8 @@ func TestUpdateLocal(t *testing.T) {
}
func TestRemoteUpdateExisting(t *testing.T) {
m := NewModel("foo")
fs := Walk("testdata", m, false)
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
newFile := protocol.FileInfo{
@@ -105,8 +117,8 @@ func TestRemoteUpdateExisting(t *testing.T) {
}
func TestRemoteAddNew(t *testing.T) {
m := NewModel("foo")
fs := Walk("testdata", m, false)
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
newFile := protocol.FileInfo{
@@ -122,8 +134,8 @@ func TestRemoteAddNew(t *testing.T) {
}
func TestRemoteUpdateOld(t *testing.T) {
m := NewModel("foo")
fs := Walk("testdata", m, false)
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
oldTimeStamp := int64(1234)
@@ -140,8 +152,8 @@ func TestRemoteUpdateOld(t *testing.T) {
}
func TestRemoteIndexUpdate(t *testing.T) {
m := NewModel("foo")
fs := Walk("testdata", m, false)
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
foo := protocol.FileInfo{
@@ -173,8 +185,8 @@ func TestRemoteIndexUpdate(t *testing.T) {
}
func TestDelete(t *testing.T) {
m := NewModel("foo")
fs := Walk("testdata", m, false)
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
if l1, l2 := len(m.local), len(fs); l1 != l2 {
@@ -190,7 +202,7 @@ func TestDelete(t *testing.T) {
Modified: ot,
Blocks: []Block{{0, 100, []byte("some hash bytes")}},
}
m.UpdateLocal(newFile)
m.updateLocal(newFile)
if l1, l2 := len(m.local), len(fs)+1; l1 != l2 {
t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2)
@@ -263,8 +275,8 @@ func TestDelete(t *testing.T) {
}
func TestForgetNode(t *testing.T) {
m := NewModel("foo")
fs := Walk("testdata", m, false)
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
if l1, l2 := len(m.local), len(fs); l1 != l2 {
@@ -306,3 +318,50 @@ func TestForgetNode(t *testing.T) {
t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2)
}
}
func TestRequest(t *testing.T) {
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
bs, err := m.Request("some node", "foo", 0, 6, nil)
if err != nil {
t.Fatal(err)
}
if bytes.Compare(bs, []byte("foobar")) != 0 {
t.Errorf("Incorrect data from request: %q", string(bs))
}
bs, err = m.Request("some node", "../walk.go", 0, 6, nil)
if err == nil {
t.Error("Unexpected nil error on insecure file read")
}
if bs != nil {
t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs))
}
}
func TestSuppression(t *testing.T) {
var testdata = []struct {
lastChange time.Time
hold int
result bool
}{
{time.Unix(0, 0), 0, false}, // First change
{time.Now().Add(-1 * time.Second), 0, true}, // Changed once one second ago, suppress
{time.Now().Add(-119 * time.Second), 0, true}, // Changed once 119 seconds ago, suppress
{time.Now().Add(-121 * time.Second), 0, false}, // Changed once 121 seconds ago, permit
{time.Now().Add(-179 * time.Second), 1, true}, // Suppressed once 179 seconds ago, suppress again
{time.Now().Add(-181 * time.Second), 1, false}, // Suppressed once 181 seconds ago, permit
{time.Now().Add(-599 * time.Second), 99, true}, // Suppressed lots of times, last allowed 599 seconds ago, suppress again
{time.Now().Add(-601 * time.Second), 99, false}, // Suppressed lots of times, last allowed 601 seconds ago, permit
}
for i, tc := range testdata {
if shouldSuppressChange(tc.lastChange, tc.hold) != tc.result {
t.Errorf("Incorrect result for test #%d: %v", i, tc)
}
}
}

2
model/testdata/.stignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.*
quux

View File

View File

View File

203
model/walk.go Normal file
View File

@@ -0,0 +1,203 @@
package model
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"path"
"path/filepath"
"strings"
"time"
)
const BlockSize = 128 * 1024
type File struct {
Name string
Flags uint32
Modified int64
Blocks []Block
}
func (f File) Size() (bytes int) {
for _, b := range f.Blocks {
bytes += int(b.Length)
}
return
}
func isTempName(name string) bool {
return strings.HasPrefix(path.Base(name), ".syncthing.")
}
func tempName(name string, modified int64) string {
tdir := path.Dir(name)
tname := fmt.Sprintf(".syncthing.%s.%d", path.Base(name), modified)
return path.Join(tdir, tname)
}
func (m *Model) genWalker(res *[]File, ign map[string][]string) filepath.WalkFunc {
return func(p string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if isTempName(p) {
return nil
}
rn, err := filepath.Rel(m.dir, p)
if err != nil {
return nil
}
if pn, sn := path.Split(rn); sn == ".stignore" {
pn := strings.Trim(pn, "/")
bs, _ := ioutil.ReadFile(p)
lines := bytes.Split(bs, []byte("\n"))
var patterns []string
for _, line := range lines {
if len(line) > 0 {
patterns = append(patterns, string(line))
}
}
ign[pn] = patterns
return nil
}
if info.Mode()&os.ModeType == 0 {
fi, err := os.Stat(p)
if err != nil {
return nil
}
modified := fi.ModTime().Unix()
m.RLock()
hf, ok := m.local[rn]
m.RUnlock()
if ok && hf.Modified == modified {
// No change
*res = append(*res, hf)
} else {
m.Lock()
if m.shouldSuppressChange(rn) {
if m.trace["file"] {
log.Println("FILE: SUPPRESS:", rn, m.fileWasSuppressed[rn], time.Since(m.fileLastChanged[rn]))
}
if ok {
// Files that are ignored will be suppressed but don't actually exist in the local model
*res = append(*res, hf)
}
m.Unlock()
return nil
}
m.Unlock()
if m.trace["file"] {
log.Printf("FILE: Hash %q", p)
}
fd, err := os.Open(p)
if err != nil {
if m.trace["file"] {
log.Printf("FILE: %q: %v", p, err)
}
return nil
}
defer fd.Close()
blocks, err := Blocks(fd, BlockSize)
if err != nil {
if m.trace["file"] {
log.Printf("FILE: %q: %v", p, err)
}
return nil
}
f := File{
Name: rn,
Flags: uint32(info.Mode()),
Modified: modified,
Blocks: blocks,
}
*res = append(*res, f)
}
}
return nil
}
}
// Walk returns the list of files found in the local repository by scanning the
// file system. Files are blockwise hashed.
func (m *Model) Walk(followSymlinks bool) (files []File, ignore map[string][]string) {
ignore = make(map[string][]string)
fn := m.genWalker(&files, ignore)
filepath.Walk(m.dir, fn)
if followSymlinks {
d, err := os.Open(m.dir)
if err != nil {
return
}
defer d.Close()
fis, err := d.Readdir(-1)
if err != nil {
return
}
for _, fi := range fis {
if fi.Mode()&os.ModeSymlink != 0 {
filepath.Walk(path.Join(m.dir, fi.Name())+"/", fn)
}
}
}
return
}
// Walk returns the list of files found in the local repository by scanning the
// file system. Files are blockwise hashed. Patterns marked in .stignore files
// are removed from the results.
func (m *Model) FilteredWalk(followSymlinks bool) []File {
var files, ignored = m.Walk(followSymlinks)
return ignoreFilter(ignored, files)
}
func (m *Model) cleanTempFile(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.Mode()&os.ModeType == 0 && isTempName(path) {
if m.trace["file"] {
log.Printf("FILE: Remove %q", path)
}
os.Remove(path)
}
return nil
}
func (m *Model) cleanTempFiles() {
filepath.Walk(m.dir, m.cleanTempFile)
}
func ignoreFilter(patterns map[string][]string, files []File) (filtered []File) {
nextFile:
for _, f := range files {
first, last := path.Split(f.Name)
for prefix, pats := range patterns {
if len(prefix) == 0 || prefix == first || strings.HasPrefix(first, prefix+"/") {
for _, pattern := range pats {
if match, _ := path.Match(pattern, last); match {
continue nextFile
}
}
}
}
filtered = append(filtered, f)
}
return filtered
}

102
model/walk_test.go Normal file
View File

@@ -0,0 +1,102 @@
package model
import (
"fmt"
"reflect"
"testing"
"time"
)
var testdata = []struct {
name string
size int
hash string
}{
{"bar", 10, "2f72cc11a6fcd0271ecef8c61056ee1eb1243be3805bf9a9df98f92f7636b05c"},
{"baz/quux", 9, "c154d94e94ba7298a6adb0523afe34d1b6a581d6b893a763d45ddc5e209dcb83"},
{"foo", 7, "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f"},
}
var correctIgnores = map[string][]string{
"": {".*", "quux"},
}
func TestWalk(t *testing.T) {
m := NewModel("testdata")
files, ignores := m.Walk(false)
if l1, l2 := len(files), len(testdata); l1 != l2 {
t.Fatalf("Incorrect number of walked files %d != %d", l1, l2)
}
for i := range testdata {
if n1, n2 := testdata[i].name, files[i].Name; n1 != n2 {
t.Errorf("Incorrect file name %q != %q for case #%d", n1, n2, i)
}
if h1, h2 := fmt.Sprintf("%x", files[i].Blocks[0].Hash), testdata[i].hash; h1 != h2 {
t.Errorf("Incorrect hash %q != %q for case #%d", h1, h2, i)
}
t0 := time.Date(2010, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
t1 := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
if mt := files[i].Modified; mt < t0 || mt > t1 {
t.Errorf("Unrealistic modtime %d for test %d", mt, i)
}
}
if !reflect.DeepEqual(ignores, correctIgnores) {
t.Errorf("Incorrect ignores\n %v\n %v", correctIgnores, ignores)
}
}
func TestFilteredWalk(t *testing.T) {
m := NewModel("testdata")
files := m.FilteredWalk(false)
if len(files) != 2 {
t.Fatalf("Incorrect number of walked filtered files %d != 2", len(files))
}
if files[0].Name != "bar" {
t.Error("Incorrect first file", files[0])
}
if files[1].Name != "foo" {
t.Error("Incorrect second file", files[1])
}
}
func TestIgnore(t *testing.T) {
var patterns = map[string][]string{
"": {"t2"},
"foo": {"bar", "z*"},
"foo/baz": {"quux", ".*"},
}
var files = []File{
{Name: "foo/bar"},
{Name: "foo/quux"},
{Name: "foo/zuux"},
{Name: "foo/qzuux"},
{Name: "foo/baz/t1"},
{Name: "foo/baz/t2"},
{Name: "foo/baz/bar"},
{Name: "foo/baz/quuxa"},
{Name: "foo/baz/aquux"},
{Name: "foo/baz/.quux"},
{Name: "foo/baz/zquux"},
{Name: "foo/baz/quux"},
{Name: "foo/bazz/quux"},
}
var remaining = []File{
{Name: "foo/quux"},
{Name: "foo/qzuux"},
{Name: "foo/baz/t1"},
{Name: "foo/baz/quuxa"},
{Name: "foo/baz/aquux"},
{Name: "foo/bazz/quux"},
}
var filtered = ignoreFilter(patterns, files)
if !reflect.DeepEqual(filtered, remaining) {
t.Errorf("Filtering mismatch\n %v\n %v", remaining, filtered)
}
}

View File

@@ -9,7 +9,7 @@ Each node has a _repository_ of files described by the _local model_,
containing modifications times and block hashes. The local model is sent
to the other nodes in the cluster. The union of all files in the local
models, with files selected for most recent modification time, forms the
_global model_. Each node strives to get it's repository in synch with
_global model_. Each node strives to get it's repository in sync with
the global model by requesting missing blocks from the other nodes.
Transport and Authentication

View File

@@ -1,13 +1,6 @@
package main
import (
"fmt"
"time"
)
func timing(name string, t0 time.Time) {
debugf("%s: %.02f ms", name, time.Since(t0).Seconds()*1000)
}
import "fmt"
func MetricPrefix(n int) string {
if n > 1e9 {

154
walk.go
View File

@@ -1,154 +0,0 @@
package main
import (
"fmt"
"os"
"path"
"path/filepath"
"strings"
)
const BlockSize = 128 * 1024
type File struct {
Name string
Flags uint32
Modified int64
Blocks BlockList
}
func (f File) Dump() {
fmt.Printf("%s\n", f.Name)
for _, b := range f.Blocks {
fmt.Printf(" %dB @ %d: %x\n", b.Length, b.Offset, b.Hash)
}
fmt.Println()
}
func (f File) Size() (bytes int) {
for _, b := range f.Blocks {
bytes += int(b.Length)
}
return
}
func isTempName(name string) bool {
return strings.HasPrefix(path.Base(name), ".syncthing.")
}
func tempName(name string, modified int64) string {
tdir := path.Dir(name)
tname := fmt.Sprintf(".syncthing.%s.%d", path.Base(name), modified)
return path.Join(tdir, tname)
}
func genWalker(base string, res *[]File, model *Model) filepath.WalkFunc {
return func(p string, info os.FileInfo, err error) error {
if err != nil {
warnln(err)
return nil
}
if isTempName(p) {
return nil
}
if info.Mode()&os.ModeType == 0 {
rn, err := filepath.Rel(base, p)
if err != nil {
warnln(err)
return nil
}
fi, err := os.Stat(p)
if err != nil {
warnln(err)
return nil
}
modified := fi.ModTime().Unix()
hf, ok := model.LocalFile(rn)
if ok && hf.Modified == modified {
// No change
*res = append(*res, hf)
} else {
if opts.Debug.TraceFile {
debugf("FILE: Hash %q", p)
}
fd, err := os.Open(p)
if err != nil {
warnln(err)
return nil
}
defer fd.Close()
blocks, err := Blocks(fd, BlockSize)
if err != nil {
warnln(err)
return nil
}
f := File{
Name: rn,
Flags: uint32(info.Mode()),
Modified: modified,
Blocks: blocks,
}
*res = append(*res, f)
}
}
return nil
}
}
func Walk(dir string, model *Model, followSymlinks bool) []File {
var files []File
fn := genWalker(dir, &files, model)
err := filepath.Walk(dir, fn)
if err != nil {
warnln(err)
}
if !opts.NoSymlinks {
d, err := os.Open(dir)
if err != nil {
warnln(err)
return files
}
defer d.Close()
fis, err := d.Readdir(-1)
if err != nil {
warnln(err)
return files
}
for _, fi := range fis {
if fi.Mode()&os.ModeSymlink != 0 {
err := filepath.Walk(path.Join(dir, fi.Name())+"/", fn)
if err != nil {
warnln(err)
}
}
}
}
return files
}
func cleanTempFile(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.Mode()&os.ModeType == 0 && isTempName(path) {
if opts.Debug.TraceFile {
debugf("FILE: Remove %q", path)
}
os.Remove(path)
}
return nil
}
func CleanTempFiles(dir string) {
filepath.Walk(dir, cleanTempFile)
}

View File

@@ -1,42 +0,0 @@
package main
import (
"fmt"
"testing"
"time"
)
var testdata = []struct {
name string
size int
hash string
}{
{"bar", 10, "2f72cc11a6fcd0271ecef8c61056ee1eb1243be3805bf9a9df98f92f7636b05c"},
{"baz/quux", 9, "c154d94e94ba7298a6adb0523afe34d1b6a581d6b893a763d45ddc5e209dcb83"},
{"foo", 7, "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f"},
}
func TestWalk(t *testing.T) {
m := new(Model)
files := Walk("testdata", m, false)
if l1, l2 := len(files), len(testdata); l1 != l2 {
t.Fatalf("Incorrect number of walked files %d != %d", l1, l2)
}
for i := range testdata {
if n1, n2 := testdata[i].name, files[i].Name; n1 != n2 {
t.Errorf("Incorrect file name %q != %q for case #%d", n1, n2, i)
}
if h1, h2 := fmt.Sprintf("%x", files[i].Blocks[0].Hash), testdata[i].hash; h1 != h2 {
t.Errorf("Incorrect hash %q != %q for case #%d", h1, h2, i)
}
t0 := time.Date(2010, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
t1 := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
if mt := files[i].Modified; mt < t0 || mt > t1 {
t.Errorf("Unrealistic modtime %d for test %d", mt, i)
}
}
}