mirror of
https://github.com/syncthing/syncthing.git
synced 2026-01-06 04:49:09 -05:00
Compare commits
12 Commits
v1.4.0-rc.
...
v1.4.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db02545ef3 | ||
|
|
52e72e0122 | ||
|
|
d1e0a38c04 | ||
|
|
0b610017ea | ||
|
|
9a1df97c69 | ||
|
|
ee61da5b6a | ||
|
|
a5e12a0a3d | ||
|
|
4f29180e7c | ||
|
|
0fb2cd52ff | ||
|
|
a4bd4d118a | ||
|
|
bb375b1aff | ||
|
|
05e23f1991 |
21
build.go
21
build.go
@@ -654,7 +654,11 @@ func shouldBuildSyso(dir string) (string, error) {
|
||||
}
|
||||
|
||||
jsonPath := filepath.Join(dir, "versioninfo.json")
|
||||
ioutil.WriteFile(jsonPath, bs, 0644)
|
||||
err = ioutil.WriteFile(jsonPath, bs, 0644)
|
||||
if err != nil {
|
||||
return "", errors.New("failed to create " + jsonPath + ": " + err.Error())
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := os.Remove(jsonPath); err != nil {
|
||||
log.Printf("Warning: unable to remove generated %s: %v. Please remove it manually.", jsonPath, err)
|
||||
@@ -860,13 +864,22 @@ func getVersion() string {
|
||||
return "unknown-dev"
|
||||
}
|
||||
|
||||
func semanticVersion() (major, minor, patch, build string) {
|
||||
func semanticVersion() (major, minor, patch, build int) {
|
||||
r := regexp.MustCompile(`v(?P<Major>\d+)\.(?P<Minor>\d+).(?P<Patch>\d+).*\+(?P<CommitsAhead>\d+)`)
|
||||
matches := r.FindStringSubmatch(getVersion())
|
||||
if len(matches) != 5 {
|
||||
return "0", "0", "0", "0"
|
||||
return 0, 0, 0, 0
|
||||
}
|
||||
return matches[1], matches[2], matches[3], matches[4]
|
||||
|
||||
var ints [4]int
|
||||
for i := 1; i < 5; i++ {
|
||||
value, err := strconv.Atoi(matches[i])
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0
|
||||
}
|
||||
ints[i-1] = value
|
||||
}
|
||||
return ints[0], ints[1], ints[2], ints[3]
|
||||
}
|
||||
|
||||
func getBranchSuffix() string {
|
||||
|
||||
@@ -148,7 +148,7 @@ func idxck(ldb backend.Backend) (success bool) {
|
||||
}
|
||||
}
|
||||
|
||||
if fi.BlocksHash != nil {
|
||||
if len(fi.Blocks) == 0 && len(fi.BlocksHash) != 0 {
|
||||
key := string(fi.BlocksHash)
|
||||
if _, ok := blocklists[key]; !ok {
|
||||
fmt.Printf("Missing block list for file %q, block list hash %x\n", fi.Name, fi.BlocksHash)
|
||||
|
||||
@@ -119,8 +119,8 @@ are mostly useful for developers. Use with care.
|
||||
"h", "m" and "s" abbreviations for hours minutes and seconds.
|
||||
Valid values are like "720h", "30s", etc.
|
||||
|
||||
STGCBLOCKSEVERY Set to a time interval to override the default database
|
||||
block GC interval of 13 hours. Same format as the
|
||||
STGCINDIRECTEVERY Set to a time interval to override the default database
|
||||
indirection GC interval of 13 hours. Same format as the
|
||||
STRECHECKDBEVERY variable.
|
||||
|
||||
GOMAXPROCS Set the maximum number of CPU cores to use. Defaults to all
|
||||
|
||||
@@ -19,22 +19,25 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// We set the bloom filter capacity to handle 100k individual block lists
|
||||
// with a false positive probability of 1% for the first pass. Once we know
|
||||
// how many block lists we have we will use that number instead, if it's
|
||||
// more than 100k. For fewer than 100k block lists we will just get better
|
||||
// false positive rate instead.
|
||||
blockGCBloomCapacity = 100000
|
||||
blockGCBloomFalsePositiveRate = 0.01 // 1%
|
||||
blockGCDefaultInterval = 13 * time.Hour
|
||||
blockGCTimeKey = "lastBlockGCTime"
|
||||
// We set the bloom filter capacity to handle 100k individual items with
|
||||
// a false positive probability of 1% for the first pass. Once we know
|
||||
// how many items we have we will use that number instead, if it's more
|
||||
// than 100k. For fewer than 100k items we will just get better false
|
||||
// positive rate instead.
|
||||
indirectGCBloomCapacity = 100000
|
||||
indirectGCBloomFalsePositiveRate = 0.01 // 1%
|
||||
indirectGCDefaultInterval = 13 * time.Hour
|
||||
indirectGCTimeKey = "lastIndirectGCTime"
|
||||
|
||||
// Use indirection for the block list when it exceeds this many entries
|
||||
blocksIndirectionCutoff = 3
|
||||
)
|
||||
|
||||
var blockGCInterval = blockGCDefaultInterval
|
||||
var indirectGCInterval = indirectGCDefaultInterval
|
||||
|
||||
func init() {
|
||||
if dur, err := time.ParseDuration(os.Getenv("STGCBLOCKSEVERY")); err == nil {
|
||||
blockGCInterval = dur
|
||||
if dur, err := time.ParseDuration(os.Getenv("STGCINDIRECTEVERY")); err == nil {
|
||||
indirectGCInterval = dur
|
||||
}
|
||||
}
|
||||
|
||||
@@ -485,18 +488,28 @@ func (db *Lowlevel) dropPrefix(prefix []byte) error {
|
||||
}
|
||||
|
||||
func (db *Lowlevel) gcRunner() {
|
||||
t := time.NewTimer(db.timeUntil(blockGCTimeKey, blockGCInterval))
|
||||
// Calculate the time for the next GC run. Even if we should run GC
|
||||
// directly, give the system a while to get up and running and do other
|
||||
// stuff first. (We might have migrations and stuff which would be
|
||||
// better off running before GC.)
|
||||
next := db.timeUntil(indirectGCTimeKey, indirectGCInterval)
|
||||
if next < time.Minute {
|
||||
next = time.Minute
|
||||
}
|
||||
|
||||
t := time.NewTimer(next)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-db.gcStop:
|
||||
return
|
||||
case <-t.C:
|
||||
if err := db.gcBlocks(); err != nil {
|
||||
l.Warnln("Database block GC failed:", err)
|
||||
if err := db.gcIndirect(); err != nil {
|
||||
l.Warnln("Database indirection GC failed:", err)
|
||||
}
|
||||
db.recordTime(blockGCTimeKey)
|
||||
t.Reset(db.timeUntil(blockGCTimeKey, blockGCInterval))
|
||||
db.recordTime(indirectGCTimeKey)
|
||||
t.Reset(db.timeUntil(indirectGCTimeKey, indirectGCInterval))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -521,15 +534,16 @@ func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
|
||||
return sleepTime
|
||||
}
|
||||
|
||||
func (db *Lowlevel) gcBlocks() error {
|
||||
// The block GC uses a bloom filter to track used block lists. This means
|
||||
// iterating over all items, adding their block lists to the filter, then
|
||||
// iterating over the block lists and removing those that don't match the
|
||||
// filter. The filter will give false positives so we will keep around one
|
||||
// percent of block lists that we don't really need (at most).
|
||||
func (db *Lowlevel) gcIndirect() error {
|
||||
// The indirection GC uses bloom filters to track used block lists and
|
||||
// versions. This means iterating over all items, adding their hashes to
|
||||
// the filter, then iterating over the indirected items and removing
|
||||
// those that don't match the filter. The filter will give false
|
||||
// positives so we will keep around one percent of things that we don't
|
||||
// really need (at most).
|
||||
//
|
||||
// Block GC needs to run when there are no modifications to the FileInfos or
|
||||
// block lists.
|
||||
// Indirection GC needs to run when there are no modifications to the
|
||||
// FileInfos or indirected items.
|
||||
|
||||
db.gcMut.Lock()
|
||||
defer db.gcMut.Unlock()
|
||||
@@ -540,30 +554,32 @@ func (db *Lowlevel) gcBlocks() error {
|
||||
}
|
||||
defer t.Release()
|
||||
|
||||
// Set up the bloom filter with the initial capacity and false positive
|
||||
// rate, or higher capacity if we've done this before and seen lots of block
|
||||
// lists.
|
||||
// Set up the bloom filters with the initial capacity and false positive
|
||||
// rate, or higher capacity if we've done this before and seen lots of
|
||||
// items. For simplicity's sake we track just one count, which is the
|
||||
// highest of the various indirected items.
|
||||
|
||||
capacity := blockGCBloomCapacity
|
||||
capacity := indirectGCBloomCapacity
|
||||
if db.gcKeyCount > capacity {
|
||||
capacity = db.gcKeyCount
|
||||
}
|
||||
filter := bloom.NewWithEstimates(uint(capacity), blockGCBloomFalsePositiveRate)
|
||||
blockFilter := bloom.NewWithEstimates(uint(capacity), indirectGCBloomFalsePositiveRate)
|
||||
|
||||
// Iterate the FileInfos, unmarshal the blocks hashes and add them to
|
||||
// the filter.
|
||||
// Iterate the FileInfos, unmarshal the block and version hashes and
|
||||
// add them to the filter.
|
||||
|
||||
it, err := db.NewPrefixIterator([]byte{KeyTypeDevice})
|
||||
it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer it.Release()
|
||||
for it.Next() {
|
||||
var bl BlocksHashOnly
|
||||
if err := bl.Unmarshal(it.Value()); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(bl.BlocksHash) > 0 {
|
||||
filter.Add(bl.BlocksHash)
|
||||
blockFilter.Add(bl.BlocksHash)
|
||||
}
|
||||
}
|
||||
it.Release()
|
||||
@@ -574,15 +590,16 @@ func (db *Lowlevel) gcBlocks() error {
|
||||
// Iterate over block lists, removing keys with hashes that don't match
|
||||
// the filter.
|
||||
|
||||
it, err = db.NewPrefixIterator([]byte{KeyTypeBlockList})
|
||||
it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
matched := 0
|
||||
defer it.Release()
|
||||
matchedBlocks := 0
|
||||
for it.Next() {
|
||||
key := blockListKey(it.Key())
|
||||
if filter.Test(key.BlocksHash()) {
|
||||
matched++
|
||||
if blockFilter.Test(key.BlocksHash()) {
|
||||
matchedBlocks++
|
||||
continue
|
||||
}
|
||||
if err := t.Delete(key); err != nil {
|
||||
@@ -595,7 +612,7 @@ func (db *Lowlevel) gcBlocks() error {
|
||||
}
|
||||
|
||||
// Remember the number of unique keys we kept until the next pass.
|
||||
db.gcKeyCount = matched
|
||||
db.gcKeyCount = matchedBlocks
|
||||
|
||||
if err := t.Commit(); err != nil {
|
||||
return err
|
||||
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/db/backend"
|
||||
"github.com/syncthing/syncthing/lib/fs"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
)
|
||||
|
||||
@@ -101,3 +103,75 @@ func TestMetaSequences(t *testing.T) {
|
||||
t.Error("sequence of first device should be 4, not", seq)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecalcMeta(t *testing.T) {
|
||||
ldb := NewLowlevel(backend.OpenMemory())
|
||||
defer ldb.Close()
|
||||
|
||||
// Add some files
|
||||
s1 := NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeFake, "fake"), ldb)
|
||||
files := []protocol.FileInfo{
|
||||
{Name: "a", Size: 1000},
|
||||
{Name: "b", Size: 2000},
|
||||
}
|
||||
s1.Update(protocol.LocalDeviceID, files)
|
||||
|
||||
// Verify local/global size
|
||||
snap := s1.Snapshot()
|
||||
ls := snap.LocalSize()
|
||||
gs := snap.GlobalSize()
|
||||
snap.Release()
|
||||
if ls.Bytes != 3000 {
|
||||
t.Fatalf("Wrong initial local byte count, %d != 3000", ls.Bytes)
|
||||
}
|
||||
if gs.Bytes != 3000 {
|
||||
t.Fatalf("Wrong initial global byte count, %d != 3000", gs.Bytes)
|
||||
}
|
||||
|
||||
// Reach into the database to make the metadata tracker intentionally
|
||||
// wrong and out of date
|
||||
curSeq := s1.meta.Sequence(protocol.LocalDeviceID)
|
||||
tran, err := ldb.newReadWriteTransaction()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s1.meta.mut.Lock()
|
||||
s1.meta.countsPtr(protocol.LocalDeviceID, 0).Sequence = curSeq - 1 // too low
|
||||
s1.meta.countsPtr(protocol.LocalDeviceID, 0).Bytes = 1234 // wrong
|
||||
s1.meta.countsPtr(protocol.GlobalDeviceID, 0).Bytes = 1234 // wrong
|
||||
s1.meta.dirty = true
|
||||
s1.meta.mut.Unlock()
|
||||
if err := s1.meta.toDB(tran, []byte("test")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := tran.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify that our bad data "took"
|
||||
snap = s1.Snapshot()
|
||||
ls = snap.LocalSize()
|
||||
gs = snap.GlobalSize()
|
||||
snap.Release()
|
||||
if ls.Bytes != 1234 {
|
||||
t.Fatalf("Wrong changed local byte count, %d != 1234", ls.Bytes)
|
||||
}
|
||||
if gs.Bytes != 1234 {
|
||||
t.Fatalf("Wrong changed global byte count, %d != 1234", gs.Bytes)
|
||||
}
|
||||
|
||||
// Create a new fileset, which will realize the inconsistency and recalculate
|
||||
s2 := NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeFake, "fake"), ldb)
|
||||
|
||||
// Verify local/global size
|
||||
snap = s2.Snapshot()
|
||||
ls = snap.LocalSize()
|
||||
gs = snap.GlobalSize()
|
||||
snap.Release()
|
||||
if ls.Bytes != 3000 {
|
||||
t.Fatalf("Wrong fixed local byte count, %d != 3000", ls.Bytes)
|
||||
}
|
||||
if gs.Bytes != 3000 {
|
||||
t.Fatalf("Wrong fixed global byte count, %d != 3000", gs.Bytes)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,9 +7,11 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/db/backend"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
)
|
||||
|
||||
@@ -23,11 +25,17 @@ import (
|
||||
// 6: v0.14.50
|
||||
// 7: v0.14.53
|
||||
// 8: v1.4.0
|
||||
// 9: v1.4.0
|
||||
const (
|
||||
dbVersion = 8
|
||||
dbVersion = 9
|
||||
dbMinSyncthingVersion = "v1.4.0"
|
||||
)
|
||||
|
||||
var (
|
||||
errFolderIdxMissing = fmt.Errorf("folder db index missing")
|
||||
errDeviceIdxMissing = fmt.Errorf("device db index missing")
|
||||
)
|
||||
|
||||
type databaseDowngradeError struct {
|
||||
minSyncthingVersion string
|
||||
}
|
||||
@@ -49,6 +57,11 @@ type schemaUpdater struct {
|
||||
}
|
||||
|
||||
func (db *schemaUpdater) updateSchema() error {
|
||||
// Updating the schema can touch any and all parts of the database. Make
|
||||
// sure we do not run GC concurrently with schema migrations.
|
||||
db.gcMut.Lock()
|
||||
defer db.gcMut.Unlock()
|
||||
|
||||
miscDB := NewMiscDataNamespace(db.Lowlevel)
|
||||
prevVersion, _, err := miscDB.Int64("dbVersion")
|
||||
if err != nil {
|
||||
@@ -80,7 +93,7 @@ func (db *schemaUpdater) updateSchema() error {
|
||||
{5, db.updateSchemaTo5},
|
||||
{6, db.updateSchema5to6},
|
||||
{7, db.updateSchema6to7},
|
||||
{8, db.updateSchema7to8},
|
||||
{9, db.updateSchemato9},
|
||||
}
|
||||
|
||||
for _, m := range migrations {
|
||||
@@ -421,8 +434,9 @@ func (db *schemaUpdater) updateSchema6to7(_ int) error {
|
||||
return t.Commit()
|
||||
}
|
||||
|
||||
func (db *schemaUpdater) updateSchema7to8(_ int) error {
|
||||
func (db *schemaUpdater) updateSchemato9(prev int) error {
|
||||
// Loads and rewrites all files with blocks, to deduplicate block lists.
|
||||
// Checks for missing or incorrect sequence entries and rewrites those.
|
||||
|
||||
t, err := db.newReadWriteTransaction()
|
||||
if err != nil {
|
||||
@@ -430,15 +444,69 @@ func (db *schemaUpdater) updateSchema7to8(_ int) error {
|
||||
}
|
||||
defer t.close()
|
||||
|
||||
var sk []byte
|
||||
it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metas := make(map[string]*metadataTracker)
|
||||
for it.Next() {
|
||||
var fi protocol.FileInfo
|
||||
if err := fi.Unmarshal(it.Value()); err != nil {
|
||||
intf, err := t.unmarshalTrunc(it.Value(), false)
|
||||
if backend.IsNotFound(err) {
|
||||
// Unmarshal error due to missing parts (block list), probably
|
||||
// due to a bad migration in a previous RC. Drop this key, as
|
||||
// getFile would anyway return this as a "not found" in the
|
||||
// normal flow of things.
|
||||
if err := t.Delete(it.Key()); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
fi := intf.(protocol.FileInfo)
|
||||
device, ok := t.keyer.DeviceFromDeviceFileKey(it.Key())
|
||||
if !ok {
|
||||
return errDeviceIdxMissing
|
||||
}
|
||||
if bytes.Equal(device, protocol.LocalDeviceID[:]) {
|
||||
folder, ok := t.keyer.FolderFromDeviceFileKey(it.Key())
|
||||
if !ok {
|
||||
return errFolderIdxMissing
|
||||
}
|
||||
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
|
||||
return err
|
||||
}
|
||||
switch dk, err := t.Get(sk); {
|
||||
case err != nil:
|
||||
if !backend.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
fallthrough
|
||||
case !bytes.Equal(it.Key(), dk):
|
||||
folderStr := string(folder)
|
||||
meta, ok := metas[folderStr]
|
||||
if !ok {
|
||||
meta = loadMetadataTracker(db.Lowlevel, folderStr)
|
||||
metas[folderStr] = meta
|
||||
}
|
||||
fi.Sequence = meta.nextLocalSeq()
|
||||
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.Put(sk, it.Key()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.putFile(it.Key(), fi); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
if prev == 8 {
|
||||
// The transition to 8 already did the changes below.
|
||||
continue
|
||||
}
|
||||
if fi.Blocks == nil {
|
||||
continue
|
||||
}
|
||||
@@ -451,7 +519,13 @@ func (db *schemaUpdater) updateSchema7to8(_ int) error {
|
||||
return err
|
||||
}
|
||||
|
||||
db.recordTime(blockGCTimeKey)
|
||||
for folder, meta := range metas {
|
||||
if err := meta.toDB(t, []byte(folder)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
db.recordTime(indirectGCTimeKey)
|
||||
|
||||
return t.Commit()
|
||||
}
|
||||
|
||||
@@ -71,74 +71,81 @@ func init() {
|
||||
}
|
||||
|
||||
func NewFileSet(folder string, fs fs.Filesystem, db *Lowlevel) *FileSet {
|
||||
var s = &FileSet{
|
||||
return &FileSet{
|
||||
folder: folder,
|
||||
fs: fs,
|
||||
db: db,
|
||||
meta: newMetadataTracker(),
|
||||
meta: loadMetadataTracker(db, folder),
|
||||
updateMutex: sync.NewMutex(),
|
||||
}
|
||||
}
|
||||
|
||||
recalc := func() *FileSet {
|
||||
if err := s.recalcMeta(); backend.IsClosed(err) {
|
||||
func loadMetadataTracker(db *Lowlevel, folder string) *metadataTracker {
|
||||
recalc := func() *metadataTracker {
|
||||
meta, err := recalcMeta(db, folder)
|
||||
if backend.IsClosed(err) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return s
|
||||
return meta
|
||||
}
|
||||
|
||||
if err := s.meta.fromDB(db, []byte(folder)); err != nil {
|
||||
meta := newMetadataTracker()
|
||||
if err := meta.fromDB(db, []byte(folder)); err != nil {
|
||||
l.Infof("No stored folder metadata for %q; recalculating", folder)
|
||||
return recalc()
|
||||
}
|
||||
|
||||
if metaOK := s.verifyLocalSequence(); !metaOK {
|
||||
curSeq := meta.Sequence(protocol.LocalDeviceID)
|
||||
if metaOK := verifyLocalSequence(curSeq, db, folder); !metaOK {
|
||||
l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
|
||||
return recalc()
|
||||
}
|
||||
|
||||
if age := time.Since(s.meta.Created()); age > databaseRecheckInterval {
|
||||
if age := time.Since(meta.Created()); age > databaseRecheckInterval {
|
||||
l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age)
|
||||
return recalc()
|
||||
}
|
||||
|
||||
return s
|
||||
return meta
|
||||
}
|
||||
|
||||
func (s *FileSet) recalcMeta() error {
|
||||
s.meta = newMetadataTracker()
|
||||
|
||||
if err := s.db.checkGlobals([]byte(s.folder), s.meta); err != nil {
|
||||
return err
|
||||
func recalcMeta(db *Lowlevel, folder string) (*metadataTracker, error) {
|
||||
meta := newMetadataTracker()
|
||||
if err := db.checkGlobals([]byte(folder), meta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t, err := s.db.newReadWriteTransaction()
|
||||
t, err := db.newReadWriteTransaction()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
defer t.close()
|
||||
|
||||
var deviceID protocol.DeviceID
|
||||
err = t.withAllFolderTruncated([]byte(s.folder), func(device []byte, f FileInfoTruncated) bool {
|
||||
err = t.withAllFolderTruncated([]byte(folder), func(device []byte, f FileInfoTruncated) bool {
|
||||
copy(deviceID[:], device)
|
||||
s.meta.addFile(deviceID, f)
|
||||
meta.addFile(deviceID, f)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.meta.SetCreated()
|
||||
if err := s.meta.toDB(t, []byte(s.folder)); err != nil {
|
||||
return err
|
||||
meta.SetCreated()
|
||||
if err := meta.toDB(t, []byte(folder)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return t.Commit()
|
||||
if err := t.Commit(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
// Verify the local sequence number from actual sequence entries. Returns
|
||||
// true if it was all good, or false if a fixup was necessary.
|
||||
func (s *FileSet) verifyLocalSequence() bool {
|
||||
func verifyLocalSequence(curSeq int64, db *Lowlevel, folder string) bool {
|
||||
// Walk the sequence index from the current (supposedly) highest
|
||||
// sequence number and raise the alarm if we get anything. This recovers
|
||||
// from the occasion where we have written sequence entries to disk but
|
||||
@@ -149,15 +156,18 @@ func (s *FileSet) verifyLocalSequence() bool {
|
||||
// number than we've actually seen and receive some duplicate updates
|
||||
// and then be in sync again.
|
||||
|
||||
curSeq := s.meta.Sequence(protocol.LocalDeviceID)
|
||||
|
||||
snap := s.Snapshot()
|
||||
t, err := db.newReadOnlyTransaction()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ok := true
|
||||
snap.WithHaveSequence(curSeq+1, func(fi FileIntf) bool {
|
||||
if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi FileIntf) bool {
|
||||
ok = false // we got something, which we should not have
|
||||
return false
|
||||
})
|
||||
snap.Release()
|
||||
}); err != nil && !backend.IsClosed(err) {
|
||||
panic(err)
|
||||
}
|
||||
t.close()
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
@@ -129,27 +129,36 @@ func (f FileInfoTruncated) FileModifiedBy() protocol.ShortID {
|
||||
}
|
||||
|
||||
func (f FileInfoTruncated) ConvertToIgnoredFileInfo(by protocol.ShortID) protocol.FileInfo {
|
||||
return protocol.FileInfo{
|
||||
Name: f.Name,
|
||||
Type: f.Type,
|
||||
ModifiedS: f.ModifiedS,
|
||||
ModifiedNs: f.ModifiedNs,
|
||||
ModifiedBy: by,
|
||||
Version: f.Version,
|
||||
RawBlockSize: f.RawBlockSize,
|
||||
LocalFlags: protocol.FlagLocalIgnored,
|
||||
}
|
||||
file := f.copyToFileInfo()
|
||||
file.SetIgnored(by)
|
||||
return file
|
||||
}
|
||||
|
||||
func (f FileInfoTruncated) ConvertToDeletedFileInfo(by protocol.ShortID, localFlags uint32) protocol.FileInfo {
|
||||
func (f FileInfoTruncated) ConvertToDeletedFileInfo(by protocol.ShortID) protocol.FileInfo {
|
||||
file := f.copyToFileInfo()
|
||||
file.SetDeleted(by)
|
||||
return file
|
||||
}
|
||||
|
||||
// copyToFileInfo just copies all members of FileInfoTruncated to protocol.FileInfo
|
||||
func (f FileInfoTruncated) copyToFileInfo() protocol.FileInfo {
|
||||
return protocol.FileInfo{
|
||||
Name: f.Name,
|
||||
Type: f.Type,
|
||||
ModifiedS: time.Now().Unix(),
|
||||
ModifiedBy: by,
|
||||
Deleted: true,
|
||||
Version: f.Version.Update(by),
|
||||
LocalFlags: localFlags,
|
||||
Name: f.Name,
|
||||
Size: f.Size,
|
||||
ModifiedS: f.ModifiedS,
|
||||
ModifiedBy: f.ModifiedBy,
|
||||
Version: f.Version,
|
||||
Sequence: f.Sequence,
|
||||
SymlinkTarget: f.SymlinkTarget,
|
||||
BlocksHash: f.BlocksHash,
|
||||
Type: f.Type,
|
||||
Permissions: f.Permissions,
|
||||
ModifiedNs: f.ModifiedNs,
|
||||
RawBlockSize: f.RawBlockSize,
|
||||
LocalFlags: f.LocalFlags,
|
||||
Deleted: f.Deleted,
|
||||
RawInvalid: f.RawInvalid,
|
||||
NoPermissions: f.NoPermissions,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -78,30 +78,34 @@ func (t readOnlyTransaction) unmarshalTrunc(bs []byte, trunc bool) (FileIntf, er
|
||||
return tf, nil
|
||||
}
|
||||
|
||||
var tf protocol.FileInfo
|
||||
if err := tf.Unmarshal(bs); err != nil {
|
||||
var fi protocol.FileInfo
|
||||
if err := fi.Unmarshal(bs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := t.fillBlockList(&tf); err != nil {
|
||||
if err := t.fillFileInfo(&fi); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tf, nil
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
func (t readOnlyTransaction) fillBlockList(fi *protocol.FileInfo) error {
|
||||
if len(fi.BlocksHash) == 0 {
|
||||
return nil
|
||||
// fillFileInfo follows the (possible) indirection of blocks and fills it out.
|
||||
func (t readOnlyTransaction) fillFileInfo(fi *protocol.FileInfo) error {
|
||||
var key []byte
|
||||
|
||||
if len(fi.Blocks) == 0 && len(fi.BlocksHash) != 0 {
|
||||
// The blocks list is indirected and we need to load it.
|
||||
key = t.keyer.GenerateBlockListKey(key, fi.BlocksHash)
|
||||
bs, err := t.Get(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var bl BlockList
|
||||
if err := bl.Unmarshal(bs); err != nil {
|
||||
return err
|
||||
}
|
||||
fi.Blocks = bl.Blocks
|
||||
}
|
||||
blocksKey := t.keyer.GenerateBlockListKey(nil, fi.BlocksHash)
|
||||
bs, err := t.Get(blocksKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var bl BlockList
|
||||
if err := bl.Unmarshal(bs); err != nil {
|
||||
return err
|
||||
}
|
||||
fi.Blocks = bl.Blocks
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -453,26 +457,33 @@ func (t readWriteTransaction) close() {
|
||||
t.WriteTransaction.Release()
|
||||
}
|
||||
|
||||
func (t readWriteTransaction) putFile(key []byte, fi protocol.FileInfo) error {
|
||||
if fi.Blocks != nil {
|
||||
if fi.BlocksHash == nil {
|
||||
fi.BlocksHash = protocol.BlocksHash(fi.Blocks)
|
||||
}
|
||||
blocksKey := t.keyer.GenerateBlockListKey(nil, fi.BlocksHash)
|
||||
if _, err := t.Get(blocksKey); backend.IsNotFound(err) {
|
||||
func (t readWriteTransaction) putFile(fkey []byte, fi protocol.FileInfo) error {
|
||||
var bkey []byte
|
||||
|
||||
// Always set the blocks hash when there are blocks.
|
||||
if len(fi.Blocks) > 0 {
|
||||
fi.BlocksHash = protocol.BlocksHash(fi.Blocks)
|
||||
} else {
|
||||
fi.BlocksHash = nil
|
||||
}
|
||||
|
||||
// Indirect the blocks if the block list is large enough.
|
||||
if len(fi.Blocks) > blocksIndirectionCutoff {
|
||||
bkey = t.keyer.GenerateBlockListKey(bkey, fi.BlocksHash)
|
||||
if _, err := t.Get(bkey); backend.IsNotFound(err) {
|
||||
// Marshal the block list and save it
|
||||
blocksBs := mustMarshal(&BlockList{Blocks: fi.Blocks})
|
||||
if err := t.Put(blocksKey, blocksBs); err != nil {
|
||||
if err := t.Put(bkey, blocksBs); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
fi.Blocks = nil
|
||||
}
|
||||
|
||||
fi.Blocks = nil
|
||||
fiBs := mustMarshal(&fi)
|
||||
return t.Put(key, fiBs)
|
||||
return t.Put(fkey, fiBs)
|
||||
}
|
||||
|
||||
// updateGlobal adds this device+version to the version list for the given
|
||||
@@ -723,15 +734,12 @@ func (t *readWriteTransaction) withAllFolderTruncated(folder []byte, fn func(dev
|
||||
}
|
||||
continue
|
||||
}
|
||||
var f FileInfoTruncated
|
||||
// The iterator function may keep a reference to the unmarshalled
|
||||
// struct, which in turn references the buffer it was unmarshalled
|
||||
// from. dbi.Value() just returns an internal slice that it reuses, so
|
||||
// we need to copy it.
|
||||
err := f.Unmarshal(append([]byte{}, dbi.Value()...))
|
||||
|
||||
intf, err := t.unmarshalTrunc(dbi.Value(), true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f := intf.(FileInfoTruncated)
|
||||
|
||||
switch f.Name {
|
||||
case "", ".", "..", "/": // A few obviously invalid filenames
|
||||
@@ -755,10 +763,7 @@ func (t *readWriteTransaction) withAllFolderTruncated(folder []byte, fn func(dev
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if err := dbi.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
return t.Commit()
|
||||
return dbi.Error()
|
||||
}
|
||||
|
||||
type marshaller interface {
|
||||
|
||||
@@ -532,7 +532,8 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
}
|
||||
return true
|
||||
}
|
||||
nf := file.ConvertToDeletedFileInfo(f.shortID, f.localFlags)
|
||||
nf := file.ConvertToDeletedFileInfo(f.shortID)
|
||||
nf.LocalFlags = f.localFlags
|
||||
if file.ShouldConflict() {
|
||||
// We do not want to override the global version with
|
||||
// the deleted file. Setting to an empty version makes
|
||||
|
||||
@@ -104,14 +104,8 @@ func (f *receiveOnlyFolder) Revert() {
|
||||
return true // continue
|
||||
}
|
||||
|
||||
fi = protocol.FileInfo{
|
||||
Name: fi.Name,
|
||||
Type: fi.Type,
|
||||
ModifiedS: time.Now().Unix(),
|
||||
ModifiedBy: f.shortID,
|
||||
Deleted: true,
|
||||
Version: protocol.Vector{}, // if this file ever resurfaces anywhere we want our delete to be strictly older
|
||||
}
|
||||
fi.SetDeleted(f.shortID)
|
||||
fi.Version = protocol.Vector{} // if this file ever resurfaces anywhere we want our delete to be strictly older
|
||||
} else {
|
||||
// Revert means to throw away our local changes. We reset the
|
||||
// version to the empty vector, which is strictly older than any
|
||||
|
||||
@@ -118,10 +118,7 @@ func (f *sendOnlyFolder) Override() {
|
||||
}
|
||||
if !ok || have.Name != need.Name {
|
||||
// We are missing the file
|
||||
need.Deleted = true
|
||||
need.Blocks = nil
|
||||
need.Version = need.Version.Update(f.shortID)
|
||||
need.Size = 0
|
||||
need.SetDeleted(f.shortID)
|
||||
} else {
|
||||
// We have the file, replace with our version
|
||||
have.Version = have.Version.Merge(need.Version).Update(f.shortID)
|
||||
|
||||
@@ -270,7 +270,12 @@ func (c *folderSummaryService) calculateSummaries(ctx context.Context) {
|
||||
case <-pump.C:
|
||||
t0 := time.Now()
|
||||
for _, folder := range c.foldersToHandle() {
|
||||
c.sendSummary(folder)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
c.sendSummary(ctx, folder)
|
||||
}
|
||||
|
||||
// We don't want to spend all our time calculating summaries. Lets
|
||||
@@ -280,7 +285,7 @@ func (c *folderSummaryService) calculateSummaries(ctx context.Context) {
|
||||
pump.Reset(wait)
|
||||
|
||||
case folder := <-c.immediate:
|
||||
c.sendSummary(folder)
|
||||
c.sendSummary(ctx, folder)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -313,7 +318,7 @@ func (c *folderSummaryService) foldersToHandle() []string {
|
||||
}
|
||||
|
||||
// sendSummary send the summary events for a single folder
|
||||
func (c *folderSummaryService) sendSummary(folder string) {
|
||||
func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) {
|
||||
// The folder summary contains how many bytes, files etc
|
||||
// are in the folder and how in sync we are.
|
||||
data, err := c.Summary(folder)
|
||||
@@ -326,6 +331,12 @@ func (c *folderSummaryService) sendSummary(folder string) {
|
||||
})
|
||||
|
||||
for _, devCfg := range c.cfg.Folders()[folder].Devices {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if devCfg.DeviceID.Equals(c.id) {
|
||||
// We already know about ourselves.
|
||||
continue
|
||||
|
||||
@@ -266,24 +266,36 @@ func BlocksEqual(a, b []BlockInfo) bool {
|
||||
}
|
||||
|
||||
func (f *FileInfo) SetMustRescan(by ShortID) {
|
||||
f.LocalFlags = FlagLocalMustRescan
|
||||
f.ModifiedBy = by
|
||||
f.Blocks = nil
|
||||
f.Sequence = 0
|
||||
f.setLocalFlags(by, FlagLocalMustRescan)
|
||||
}
|
||||
|
||||
func (f *FileInfo) SetIgnored(by ShortID) {
|
||||
f.LocalFlags = FlagLocalIgnored
|
||||
f.ModifiedBy = by
|
||||
f.Blocks = nil
|
||||
f.Sequence = 0
|
||||
f.setLocalFlags(by, FlagLocalIgnored)
|
||||
}
|
||||
|
||||
func (f *FileInfo) SetUnsupported(by ShortID) {
|
||||
f.LocalFlags = FlagLocalUnsupported
|
||||
f.setLocalFlags(by, FlagLocalUnsupported)
|
||||
}
|
||||
|
||||
func (f *FileInfo) SetDeleted(by ShortID) {
|
||||
f.ModifiedBy = by
|
||||
f.Deleted = true
|
||||
f.Version = f.Version.Update(by)
|
||||
f.ModifiedS = time.Now().Unix()
|
||||
f.setNoContent()
|
||||
}
|
||||
|
||||
func (f *FileInfo) setLocalFlags(by ShortID, flags uint32) {
|
||||
f.RawInvalid = false
|
||||
f.LocalFlags = flags
|
||||
f.ModifiedBy = by
|
||||
f.setNoContent()
|
||||
}
|
||||
|
||||
func (f *FileInfo) setNoContent() {
|
||||
f.Blocks = nil
|
||||
f.Sequence = 0
|
||||
f.BlocksHash = nil
|
||||
f.Size = 0
|
||||
}
|
||||
|
||||
func (b BlockInfo) String() string {
|
||||
|
||||
Reference in New Issue
Block a user