Compare commits

...

12 Commits

Author SHA1 Message Date
Jakob Borg
db02545ef3 lib/db: Be more lenient during migration (fixes #6397) (#6398) 2020-03-06 20:52:22 +01:00
Jakob Borg
52e72e0122 lib/db: Prevent GC concurrently with migration (fixes #6389) (#6390) 2020-02-29 19:51:48 +01:00
Evgeny Kuznetsov
d1e0a38c04 build: Fix syso creation (fixes #6386) (#6387) 2020-02-29 19:48:42 +01:00
Jakob Borg
0b610017ea lib/db: Correct metadata recalculation (fixes #6381) (#6382)
If we decide to recalculate the metadata we shouldn't start from
whatever we loaded from the database, as that data is wrong. We should
start from a clean slate.
2020-02-28 11:17:02 +01:00
Jakob Borg
9a1df97c69 lib/db: Remove reference to env var that never existed 2020-02-27 11:22:09 +01:00
Jakob Borg
ee61da5b6a lib/db: Slightly improve indirection (ref #6372) (#6373)
I was working on indirecting version vectors, and that resulted in some
refactoring and improving the existing block indirection stuff. We may
or may not end up doing the version vector indirection, but I think
these changes are reasonable anyhow and will simplify the diff
significantly if we do go there. The main points are:

- A bunch of renaming to make the indirection and GC not about "blocks"
  but about "indirection".

- Adding a cutoff so that we don't actually indirect for small block
  lists. This gets us better performance when handling small files as it
  cuts out the indirection for quite small loss in space efficiency.

- Being paranoid and always recalculating the hash on put. This costs
  some CPU, but the consequences if a buggy or malicious implementation
  silently substituted the block list by lying about the hash would be bad.
2020-02-27 11:22:01 +01:00
Jakob Borg
a5e12a0a3d lib/db: Allow put partial FileInfo without blocks (ref #6353) 2020-02-22 17:49:23 +01:00
Simon Frei
4f29180e7c lib/db: Don't panic on incorrect BlocksHash (fixes #6353) (#6355) 2020-02-22 16:52:34 +01:00
Simon Frei
0fb2cd52ff lib/db: Schema update to repair sequence index (ref #6304) (#6350) 2020-02-22 09:37:21 +01:00
Simon Frei
a4bd4d118a lib: Modify FileInfos consistently (ref #6321) (#6349) 2020-02-22 09:31:26 +01:00
Jakob Borg
bb375b1aff lib/model: Stop summary sender faster (ref #6319) (#6341)
One of the causes of "panic: database is closed" is that we try to send
summaries after it's been closed. Calculating summaries can take a long
time and if we have a lot of folders it's not unreasonable to think
that we might be stopped in this loop, so prepare to bail here.

* push
2020-02-14 08:11:54 +01:00
Simon Frei
05e23f1991 lib/db: Don't call backend.Commit twice (ref #6337) (#6342) 2020-02-14 08:11:24 +01:00
14 changed files with 380 additions and 163 deletions

View File

@@ -654,7 +654,11 @@ func shouldBuildSyso(dir string) (string, error) {
}
jsonPath := filepath.Join(dir, "versioninfo.json")
ioutil.WriteFile(jsonPath, bs, 0644)
err = ioutil.WriteFile(jsonPath, bs, 0644)
if err != nil {
return "", errors.New("failed to create " + jsonPath + ": " + err.Error())
}
defer func() {
if err := os.Remove(jsonPath); err != nil {
log.Printf("Warning: unable to remove generated %s: %v. Please remove it manually.", jsonPath, err)
@@ -860,13 +864,22 @@ func getVersion() string {
return "unknown-dev"
}
func semanticVersion() (major, minor, patch, build string) {
func semanticVersion() (major, minor, patch, build int) {
r := regexp.MustCompile(`v(?P<Major>\d+)\.(?P<Minor>\d+).(?P<Patch>\d+).*\+(?P<CommitsAhead>\d+)`)
matches := r.FindStringSubmatch(getVersion())
if len(matches) != 5 {
return "0", "0", "0", "0"
return 0, 0, 0, 0
}
return matches[1], matches[2], matches[3], matches[4]
var ints [4]int
for i := 1; i < 5; i++ {
value, err := strconv.Atoi(matches[i])
if err != nil {
return 0, 0, 0, 0
}
ints[i-1] = value
}
return ints[0], ints[1], ints[2], ints[3]
}
func getBranchSuffix() string {

View File

@@ -148,7 +148,7 @@ func idxck(ldb backend.Backend) (success bool) {
}
}
if fi.BlocksHash != nil {
if len(fi.Blocks) == 0 && len(fi.BlocksHash) != 0 {
key := string(fi.BlocksHash)
if _, ok := blocklists[key]; !ok {
fmt.Printf("Missing block list for file %q, block list hash %x\n", fi.Name, fi.BlocksHash)

View File

@@ -119,8 +119,8 @@ are mostly useful for developers. Use with care.
"h", "m" and "s" abbreviations for hours minutes and seconds.
Valid values are like "720h", "30s", etc.
STGCBLOCKSEVERY Set to a time interval to override the default database
block GC interval of 13 hours. Same format as the
STGCINDIRECTEVERY Set to a time interval to override the default database
indirection GC interval of 13 hours. Same format as the
STRECHECKDBEVERY variable.
GOMAXPROCS Set the maximum number of CPU cores to use. Defaults to all

View File

@@ -19,22 +19,25 @@ import (
)
const (
// We set the bloom filter capacity to handle 100k individual block lists
// with a false positive probability of 1% for the first pass. Once we know
// how many block lists we have we will use that number instead, if it's
// more than 100k. For fewer than 100k block lists we will just get better
// false positive rate instead.
blockGCBloomCapacity = 100000
blockGCBloomFalsePositiveRate = 0.01 // 1%
blockGCDefaultInterval = 13 * time.Hour
blockGCTimeKey = "lastBlockGCTime"
// We set the bloom filter capacity to handle 100k individual items with
// a false positive probability of 1% for the first pass. Once we know
// how many items we have we will use that number instead, if it's more
// than 100k. For fewer than 100k items we will just get better false
// positive rate instead.
indirectGCBloomCapacity = 100000
indirectGCBloomFalsePositiveRate = 0.01 // 1%
indirectGCDefaultInterval = 13 * time.Hour
indirectGCTimeKey = "lastIndirectGCTime"
// Use indirection for the block list when it exceeds this many entries
blocksIndirectionCutoff = 3
)
var blockGCInterval = blockGCDefaultInterval
var indirectGCInterval = indirectGCDefaultInterval
func init() {
if dur, err := time.ParseDuration(os.Getenv("STGCBLOCKSEVERY")); err == nil {
blockGCInterval = dur
if dur, err := time.ParseDuration(os.Getenv("STGCINDIRECTEVERY")); err == nil {
indirectGCInterval = dur
}
}
@@ -485,18 +488,28 @@ func (db *Lowlevel) dropPrefix(prefix []byte) error {
}
func (db *Lowlevel) gcRunner() {
t := time.NewTimer(db.timeUntil(blockGCTimeKey, blockGCInterval))
// Calculate the time for the next GC run. Even if we should run GC
// directly, give the system a while to get up and running and do other
// stuff first. (We might have migrations and stuff which would be
// better off running before GC.)
next := db.timeUntil(indirectGCTimeKey, indirectGCInterval)
if next < time.Minute {
next = time.Minute
}
t := time.NewTimer(next)
defer t.Stop()
for {
select {
case <-db.gcStop:
return
case <-t.C:
if err := db.gcBlocks(); err != nil {
l.Warnln("Database block GC failed:", err)
if err := db.gcIndirect(); err != nil {
l.Warnln("Database indirection GC failed:", err)
}
db.recordTime(blockGCTimeKey)
t.Reset(db.timeUntil(blockGCTimeKey, blockGCInterval))
db.recordTime(indirectGCTimeKey)
t.Reset(db.timeUntil(indirectGCTimeKey, indirectGCInterval))
}
}
}
@@ -521,15 +534,16 @@ func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration {
return sleepTime
}
func (db *Lowlevel) gcBlocks() error {
// The block GC uses a bloom filter to track used block lists. This means
// iterating over all items, adding their block lists to the filter, then
// iterating over the block lists and removing those that don't match the
// filter. The filter will give false positives so we will keep around one
// percent of block lists that we don't really need (at most).
func (db *Lowlevel) gcIndirect() error {
// The indirection GC uses bloom filters to track used block lists and
// versions. This means iterating over all items, adding their hashes to
// the filter, then iterating over the indirected items and removing
// those that don't match the filter. The filter will give false
// positives so we will keep around one percent of things that we don't
// really need (at most).
//
// Block GC needs to run when there are no modifications to the FileInfos or
// block lists.
// Indirection GC needs to run when there are no modifications to the
// FileInfos or indirected items.
db.gcMut.Lock()
defer db.gcMut.Unlock()
@@ -540,30 +554,32 @@ func (db *Lowlevel) gcBlocks() error {
}
defer t.Release()
// Set up the bloom filter with the initial capacity and false positive
// rate, or higher capacity if we've done this before and seen lots of block
// lists.
// Set up the bloom filters with the initial capacity and false positive
// rate, or higher capacity if we've done this before and seen lots of
// items. For simplicity's sake we track just one count, which is the
// highest of the various indirected items.
capacity := blockGCBloomCapacity
capacity := indirectGCBloomCapacity
if db.gcKeyCount > capacity {
capacity = db.gcKeyCount
}
filter := bloom.NewWithEstimates(uint(capacity), blockGCBloomFalsePositiveRate)
blockFilter := bloom.NewWithEstimates(uint(capacity), indirectGCBloomFalsePositiveRate)
// Iterate the FileInfos, unmarshal the blocks hashes and add them to
// the filter.
// Iterate the FileInfos, unmarshal the block and version hashes and
// add them to the filter.
it, err := db.NewPrefixIterator([]byte{KeyTypeDevice})
it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
if err != nil {
return err
}
defer it.Release()
for it.Next() {
var bl BlocksHashOnly
if err := bl.Unmarshal(it.Value()); err != nil {
return err
}
if len(bl.BlocksHash) > 0 {
filter.Add(bl.BlocksHash)
blockFilter.Add(bl.BlocksHash)
}
}
it.Release()
@@ -574,15 +590,16 @@ func (db *Lowlevel) gcBlocks() error {
// Iterate over block lists, removing keys with hashes that don't match
// the filter.
it, err = db.NewPrefixIterator([]byte{KeyTypeBlockList})
it, err = t.NewPrefixIterator([]byte{KeyTypeBlockList})
if err != nil {
return err
}
matched := 0
defer it.Release()
matchedBlocks := 0
for it.Next() {
key := blockListKey(it.Key())
if filter.Test(key.BlocksHash()) {
matched++
if blockFilter.Test(key.BlocksHash()) {
matchedBlocks++
continue
}
if err := t.Delete(key); err != nil {
@@ -595,7 +612,7 @@ func (db *Lowlevel) gcBlocks() error {
}
// Remember the number of unique keys we kept until the next pass.
db.gcKeyCount = matched
db.gcKeyCount = matchedBlocks
if err := t.Commit(); err != nil {
return err

View File

@@ -11,6 +11,8 @@ import (
"sort"
"testing"
"github.com/syncthing/syncthing/lib/db/backend"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/protocol"
)
@@ -101,3 +103,75 @@ func TestMetaSequences(t *testing.T) {
t.Error("sequence of first device should be 4, not", seq)
}
}
func TestRecalcMeta(t *testing.T) {
ldb := NewLowlevel(backend.OpenMemory())
defer ldb.Close()
// Add some files
s1 := NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeFake, "fake"), ldb)
files := []protocol.FileInfo{
{Name: "a", Size: 1000},
{Name: "b", Size: 2000},
}
s1.Update(protocol.LocalDeviceID, files)
// Verify local/global size
snap := s1.Snapshot()
ls := snap.LocalSize()
gs := snap.GlobalSize()
snap.Release()
if ls.Bytes != 3000 {
t.Fatalf("Wrong initial local byte count, %d != 3000", ls.Bytes)
}
if gs.Bytes != 3000 {
t.Fatalf("Wrong initial global byte count, %d != 3000", gs.Bytes)
}
// Reach into the database to make the metadata tracker intentionally
// wrong and out of date
curSeq := s1.meta.Sequence(protocol.LocalDeviceID)
tran, err := ldb.newReadWriteTransaction()
if err != nil {
t.Fatal(err)
}
s1.meta.mut.Lock()
s1.meta.countsPtr(protocol.LocalDeviceID, 0).Sequence = curSeq - 1 // too low
s1.meta.countsPtr(protocol.LocalDeviceID, 0).Bytes = 1234 // wrong
s1.meta.countsPtr(protocol.GlobalDeviceID, 0).Bytes = 1234 // wrong
s1.meta.dirty = true
s1.meta.mut.Unlock()
if err := s1.meta.toDB(tran, []byte("test")); err != nil {
t.Fatal(err)
}
if err := tran.Commit(); err != nil {
t.Fatal(err)
}
// Verify that our bad data "took"
snap = s1.Snapshot()
ls = snap.LocalSize()
gs = snap.GlobalSize()
snap.Release()
if ls.Bytes != 1234 {
t.Fatalf("Wrong changed local byte count, %d != 1234", ls.Bytes)
}
if gs.Bytes != 1234 {
t.Fatalf("Wrong changed global byte count, %d != 1234", gs.Bytes)
}
// Create a new fileset, which will realize the inconsistency and recalculate
s2 := NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeFake, "fake"), ldb)
// Verify local/global size
snap = s2.Snapshot()
ls = snap.LocalSize()
gs = snap.GlobalSize()
snap.Release()
if ls.Bytes != 3000 {
t.Fatalf("Wrong fixed local byte count, %d != 3000", ls.Bytes)
}
if gs.Bytes != 3000 {
t.Fatalf("Wrong fixed global byte count, %d != 3000", gs.Bytes)
}
}

View File

@@ -7,9 +7,11 @@
package db
import (
"bytes"
"fmt"
"strings"
"github.com/syncthing/syncthing/lib/db/backend"
"github.com/syncthing/syncthing/lib/protocol"
)
@@ -23,11 +25,17 @@ import (
// 6: v0.14.50
// 7: v0.14.53
// 8: v1.4.0
// 9: v1.4.0
const (
dbVersion = 8
dbVersion = 9
dbMinSyncthingVersion = "v1.4.0"
)
var (
errFolderIdxMissing = fmt.Errorf("folder db index missing")
errDeviceIdxMissing = fmt.Errorf("device db index missing")
)
type databaseDowngradeError struct {
minSyncthingVersion string
}
@@ -49,6 +57,11 @@ type schemaUpdater struct {
}
func (db *schemaUpdater) updateSchema() error {
// Updating the schema can touch any and all parts of the database. Make
// sure we do not run GC concurrently with schema migrations.
db.gcMut.Lock()
defer db.gcMut.Unlock()
miscDB := NewMiscDataNamespace(db.Lowlevel)
prevVersion, _, err := miscDB.Int64("dbVersion")
if err != nil {
@@ -80,7 +93,7 @@ func (db *schemaUpdater) updateSchema() error {
{5, db.updateSchemaTo5},
{6, db.updateSchema5to6},
{7, db.updateSchema6to7},
{8, db.updateSchema7to8},
{9, db.updateSchemato9},
}
for _, m := range migrations {
@@ -421,8 +434,9 @@ func (db *schemaUpdater) updateSchema6to7(_ int) error {
return t.Commit()
}
func (db *schemaUpdater) updateSchema7to8(_ int) error {
func (db *schemaUpdater) updateSchemato9(prev int) error {
// Loads and rewrites all files with blocks, to deduplicate block lists.
// Checks for missing or incorrect sequence entries and rewrites those.
t, err := db.newReadWriteTransaction()
if err != nil {
@@ -430,15 +444,69 @@ func (db *schemaUpdater) updateSchema7to8(_ int) error {
}
defer t.close()
var sk []byte
it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
if err != nil {
return err
}
metas := make(map[string]*metadataTracker)
for it.Next() {
var fi protocol.FileInfo
if err := fi.Unmarshal(it.Value()); err != nil {
intf, err := t.unmarshalTrunc(it.Value(), false)
if backend.IsNotFound(err) {
// Unmarshal error due to missing parts (block list), probably
// due to a bad migration in a previous RC. Drop this key, as
// getFile would anyway return this as a "not found" in the
// normal flow of things.
if err := t.Delete(it.Key()); err != nil {
return err
}
continue
} else if err != nil {
return err
}
fi := intf.(protocol.FileInfo)
device, ok := t.keyer.DeviceFromDeviceFileKey(it.Key())
if !ok {
return errDeviceIdxMissing
}
if bytes.Equal(device, protocol.LocalDeviceID[:]) {
folder, ok := t.keyer.FolderFromDeviceFileKey(it.Key())
if !ok {
return errFolderIdxMissing
}
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
return err
}
switch dk, err := t.Get(sk); {
case err != nil:
if !backend.IsNotFound(err) {
return err
}
fallthrough
case !bytes.Equal(it.Key(), dk):
folderStr := string(folder)
meta, ok := metas[folderStr]
if !ok {
meta = loadMetadataTracker(db.Lowlevel, folderStr)
metas[folderStr] = meta
}
fi.Sequence = meta.nextLocalSeq()
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
return err
}
if err := t.Put(sk, it.Key()); err != nil {
return err
}
if err := t.putFile(it.Key(), fi); err != nil {
return err
}
continue
}
}
if prev == 8 {
// The transition to 8 already did the changes below.
continue
}
if fi.Blocks == nil {
continue
}
@@ -451,7 +519,13 @@ func (db *schemaUpdater) updateSchema7to8(_ int) error {
return err
}
db.recordTime(blockGCTimeKey)
for folder, meta := range metas {
if err := meta.toDB(t, []byte(folder)); err != nil {
return err
}
}
db.recordTime(indirectGCTimeKey)
return t.Commit()
}

View File

@@ -71,74 +71,81 @@ func init() {
}
func NewFileSet(folder string, fs fs.Filesystem, db *Lowlevel) *FileSet {
var s = &FileSet{
return &FileSet{
folder: folder,
fs: fs,
db: db,
meta: newMetadataTracker(),
meta: loadMetadataTracker(db, folder),
updateMutex: sync.NewMutex(),
}
}
recalc := func() *FileSet {
if err := s.recalcMeta(); backend.IsClosed(err) {
func loadMetadataTracker(db *Lowlevel, folder string) *metadataTracker {
recalc := func() *metadataTracker {
meta, err := recalcMeta(db, folder)
if backend.IsClosed(err) {
return nil
} else if err != nil {
panic(err)
}
return s
return meta
}
if err := s.meta.fromDB(db, []byte(folder)); err != nil {
meta := newMetadataTracker()
if err := meta.fromDB(db, []byte(folder)); err != nil {
l.Infof("No stored folder metadata for %q; recalculating", folder)
return recalc()
}
if metaOK := s.verifyLocalSequence(); !metaOK {
curSeq := meta.Sequence(protocol.LocalDeviceID)
if metaOK := verifyLocalSequence(curSeq, db, folder); !metaOK {
l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
return recalc()
}
if age := time.Since(s.meta.Created()); age > databaseRecheckInterval {
if age := time.Since(meta.Created()); age > databaseRecheckInterval {
l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age)
return recalc()
}
return s
return meta
}
func (s *FileSet) recalcMeta() error {
s.meta = newMetadataTracker()
if err := s.db.checkGlobals([]byte(s.folder), s.meta); err != nil {
return err
func recalcMeta(db *Lowlevel, folder string) (*metadataTracker, error) {
meta := newMetadataTracker()
if err := db.checkGlobals([]byte(folder), meta); err != nil {
return nil, err
}
t, err := s.db.newReadWriteTransaction()
t, err := db.newReadWriteTransaction()
if err != nil {
return err
return nil, err
}
defer t.close()
var deviceID protocol.DeviceID
err = t.withAllFolderTruncated([]byte(s.folder), func(device []byte, f FileInfoTruncated) bool {
err = t.withAllFolderTruncated([]byte(folder), func(device []byte, f FileInfoTruncated) bool {
copy(deviceID[:], device)
s.meta.addFile(deviceID, f)
meta.addFile(deviceID, f)
return true
})
if err != nil {
return err
return nil, err
}
s.meta.SetCreated()
if err := s.meta.toDB(t, []byte(s.folder)); err != nil {
return err
meta.SetCreated()
if err := meta.toDB(t, []byte(folder)); err != nil {
return nil, err
}
return t.Commit()
if err := t.Commit(); err != nil {
return nil, err
}
return meta, nil
}
// Verify the local sequence number from actual sequence entries. Returns
// true if it was all good, or false if a fixup was necessary.
func (s *FileSet) verifyLocalSequence() bool {
func verifyLocalSequence(curSeq int64, db *Lowlevel, folder string) bool {
// Walk the sequence index from the current (supposedly) highest
// sequence number and raise the alarm if we get anything. This recovers
// from the occasion where we have written sequence entries to disk but
@@ -149,15 +156,18 @@ func (s *FileSet) verifyLocalSequence() bool {
// number than we've actually seen and receive some duplicate updates
// and then be in sync again.
curSeq := s.meta.Sequence(protocol.LocalDeviceID)
snap := s.Snapshot()
t, err := db.newReadOnlyTransaction()
if err != nil {
panic(err)
}
ok := true
snap.WithHaveSequence(curSeq+1, func(fi FileIntf) bool {
if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi FileIntf) bool {
ok = false // we got something, which we should not have
return false
})
snap.Release()
}); err != nil && !backend.IsClosed(err) {
panic(err)
}
t.close()
return ok
}

View File

@@ -129,27 +129,36 @@ func (f FileInfoTruncated) FileModifiedBy() protocol.ShortID {
}
func (f FileInfoTruncated) ConvertToIgnoredFileInfo(by protocol.ShortID) protocol.FileInfo {
return protocol.FileInfo{
Name: f.Name,
Type: f.Type,
ModifiedS: f.ModifiedS,
ModifiedNs: f.ModifiedNs,
ModifiedBy: by,
Version: f.Version,
RawBlockSize: f.RawBlockSize,
LocalFlags: protocol.FlagLocalIgnored,
}
file := f.copyToFileInfo()
file.SetIgnored(by)
return file
}
func (f FileInfoTruncated) ConvertToDeletedFileInfo(by protocol.ShortID, localFlags uint32) protocol.FileInfo {
func (f FileInfoTruncated) ConvertToDeletedFileInfo(by protocol.ShortID) protocol.FileInfo {
file := f.copyToFileInfo()
file.SetDeleted(by)
return file
}
// copyToFileInfo just copies all members of FileInfoTruncated to protocol.FileInfo
func (f FileInfoTruncated) copyToFileInfo() protocol.FileInfo {
return protocol.FileInfo{
Name: f.Name,
Type: f.Type,
ModifiedS: time.Now().Unix(),
ModifiedBy: by,
Deleted: true,
Version: f.Version.Update(by),
LocalFlags: localFlags,
Name: f.Name,
Size: f.Size,
ModifiedS: f.ModifiedS,
ModifiedBy: f.ModifiedBy,
Version: f.Version,
Sequence: f.Sequence,
SymlinkTarget: f.SymlinkTarget,
BlocksHash: f.BlocksHash,
Type: f.Type,
Permissions: f.Permissions,
ModifiedNs: f.ModifiedNs,
RawBlockSize: f.RawBlockSize,
LocalFlags: f.LocalFlags,
Deleted: f.Deleted,
RawInvalid: f.RawInvalid,
NoPermissions: f.NoPermissions,
}
}

View File

@@ -78,30 +78,34 @@ func (t readOnlyTransaction) unmarshalTrunc(bs []byte, trunc bool) (FileIntf, er
return tf, nil
}
var tf protocol.FileInfo
if err := tf.Unmarshal(bs); err != nil {
var fi protocol.FileInfo
if err := fi.Unmarshal(bs); err != nil {
return nil, err
}
if err := t.fillBlockList(&tf); err != nil {
if err := t.fillFileInfo(&fi); err != nil {
return nil, err
}
return tf, nil
return fi, nil
}
func (t readOnlyTransaction) fillBlockList(fi *protocol.FileInfo) error {
if len(fi.BlocksHash) == 0 {
return nil
// fillFileInfo follows the (possible) indirection of blocks and fills it out.
func (t readOnlyTransaction) fillFileInfo(fi *protocol.FileInfo) error {
var key []byte
if len(fi.Blocks) == 0 && len(fi.BlocksHash) != 0 {
// The blocks list is indirected and we need to load it.
key = t.keyer.GenerateBlockListKey(key, fi.BlocksHash)
bs, err := t.Get(key)
if err != nil {
return err
}
var bl BlockList
if err := bl.Unmarshal(bs); err != nil {
return err
}
fi.Blocks = bl.Blocks
}
blocksKey := t.keyer.GenerateBlockListKey(nil, fi.BlocksHash)
bs, err := t.Get(blocksKey)
if err != nil {
return err
}
var bl BlockList
if err := bl.Unmarshal(bs); err != nil {
return err
}
fi.Blocks = bl.Blocks
return nil
}
@@ -453,26 +457,33 @@ func (t readWriteTransaction) close() {
t.WriteTransaction.Release()
}
func (t readWriteTransaction) putFile(key []byte, fi protocol.FileInfo) error {
if fi.Blocks != nil {
if fi.BlocksHash == nil {
fi.BlocksHash = protocol.BlocksHash(fi.Blocks)
}
blocksKey := t.keyer.GenerateBlockListKey(nil, fi.BlocksHash)
if _, err := t.Get(blocksKey); backend.IsNotFound(err) {
func (t readWriteTransaction) putFile(fkey []byte, fi protocol.FileInfo) error {
var bkey []byte
// Always set the blocks hash when there are blocks.
if len(fi.Blocks) > 0 {
fi.BlocksHash = protocol.BlocksHash(fi.Blocks)
} else {
fi.BlocksHash = nil
}
// Indirect the blocks if the block list is large enough.
if len(fi.Blocks) > blocksIndirectionCutoff {
bkey = t.keyer.GenerateBlockListKey(bkey, fi.BlocksHash)
if _, err := t.Get(bkey); backend.IsNotFound(err) {
// Marshal the block list and save it
blocksBs := mustMarshal(&BlockList{Blocks: fi.Blocks})
if err := t.Put(blocksKey, blocksBs); err != nil {
if err := t.Put(bkey, blocksBs); err != nil {
return err
}
} else if err != nil {
return err
}
fi.Blocks = nil
}
fi.Blocks = nil
fiBs := mustMarshal(&fi)
return t.Put(key, fiBs)
return t.Put(fkey, fiBs)
}
// updateGlobal adds this device+version to the version list for the given
@@ -723,15 +734,12 @@ func (t *readWriteTransaction) withAllFolderTruncated(folder []byte, fn func(dev
}
continue
}
var f FileInfoTruncated
// The iterator function may keep a reference to the unmarshalled
// struct, which in turn references the buffer it was unmarshalled
// from. dbi.Value() just returns an internal slice that it reuses, so
// we need to copy it.
err := f.Unmarshal(append([]byte{}, dbi.Value()...))
intf, err := t.unmarshalTrunc(dbi.Value(), true)
if err != nil {
return err
}
f := intf.(FileInfoTruncated)
switch f.Name {
case "", ".", "..", "/": // A few obviously invalid filenames
@@ -755,10 +763,7 @@ func (t *readWriteTransaction) withAllFolderTruncated(folder []byte, fn func(dev
return nil
}
}
if err := dbi.Error(); err != nil {
return err
}
return t.Commit()
return dbi.Error()
}
type marshaller interface {

View File

@@ -532,7 +532,8 @@ func (f *folder) scanSubdirs(subDirs []string) error {
}
return true
}
nf := file.ConvertToDeletedFileInfo(f.shortID, f.localFlags)
nf := file.ConvertToDeletedFileInfo(f.shortID)
nf.LocalFlags = f.localFlags
if file.ShouldConflict() {
// We do not want to override the global version with
// the deleted file. Setting to an empty version makes

View File

@@ -104,14 +104,8 @@ func (f *receiveOnlyFolder) Revert() {
return true // continue
}
fi = protocol.FileInfo{
Name: fi.Name,
Type: fi.Type,
ModifiedS: time.Now().Unix(),
ModifiedBy: f.shortID,
Deleted: true,
Version: protocol.Vector{}, // if this file ever resurfaces anywhere we want our delete to be strictly older
}
fi.SetDeleted(f.shortID)
fi.Version = protocol.Vector{} // if this file ever resurfaces anywhere we want our delete to be strictly older
} else {
// Revert means to throw away our local changes. We reset the
// version to the empty vector, which is strictly older than any

View File

@@ -118,10 +118,7 @@ func (f *sendOnlyFolder) Override() {
}
if !ok || have.Name != need.Name {
// We are missing the file
need.Deleted = true
need.Blocks = nil
need.Version = need.Version.Update(f.shortID)
need.Size = 0
need.SetDeleted(f.shortID)
} else {
// We have the file, replace with our version
have.Version = have.Version.Merge(need.Version).Update(f.shortID)

View File

@@ -270,7 +270,12 @@ func (c *folderSummaryService) calculateSummaries(ctx context.Context) {
case <-pump.C:
t0 := time.Now()
for _, folder := range c.foldersToHandle() {
c.sendSummary(folder)
select {
case <-ctx.Done():
return
default:
}
c.sendSummary(ctx, folder)
}
// We don't want to spend all our time calculating summaries. Lets
@@ -280,7 +285,7 @@ func (c *folderSummaryService) calculateSummaries(ctx context.Context) {
pump.Reset(wait)
case folder := <-c.immediate:
c.sendSummary(folder)
c.sendSummary(ctx, folder)
case <-ctx.Done():
return
@@ -313,7 +318,7 @@ func (c *folderSummaryService) foldersToHandle() []string {
}
// sendSummary send the summary events for a single folder
func (c *folderSummaryService) sendSummary(folder string) {
func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) {
// The folder summary contains how many bytes, files etc
// are in the folder and how in sync we are.
data, err := c.Summary(folder)
@@ -326,6 +331,12 @@ func (c *folderSummaryService) sendSummary(folder string) {
})
for _, devCfg := range c.cfg.Folders()[folder].Devices {
select {
case <-ctx.Done():
return
default:
}
if devCfg.DeviceID.Equals(c.id) {
// We already know about ourselves.
continue

View File

@@ -266,24 +266,36 @@ func BlocksEqual(a, b []BlockInfo) bool {
}
func (f *FileInfo) SetMustRescan(by ShortID) {
f.LocalFlags = FlagLocalMustRescan
f.ModifiedBy = by
f.Blocks = nil
f.Sequence = 0
f.setLocalFlags(by, FlagLocalMustRescan)
}
func (f *FileInfo) SetIgnored(by ShortID) {
f.LocalFlags = FlagLocalIgnored
f.ModifiedBy = by
f.Blocks = nil
f.Sequence = 0
f.setLocalFlags(by, FlagLocalIgnored)
}
func (f *FileInfo) SetUnsupported(by ShortID) {
f.LocalFlags = FlagLocalUnsupported
f.setLocalFlags(by, FlagLocalUnsupported)
}
func (f *FileInfo) SetDeleted(by ShortID) {
f.ModifiedBy = by
f.Deleted = true
f.Version = f.Version.Update(by)
f.ModifiedS = time.Now().Unix()
f.setNoContent()
}
func (f *FileInfo) setLocalFlags(by ShortID, flags uint32) {
f.RawInvalid = false
f.LocalFlags = flags
f.ModifiedBy = by
f.setNoContent()
}
func (f *FileInfo) setNoContent() {
f.Blocks = nil
f.Sequence = 0
f.BlocksHash = nil
f.Size = 0
}
func (b BlockInfo) String() string {