chore(db): use one SQLite database per folder (#10042)

This changes the database structure to use one database per folder, with
a small main database to coordinate. Reverts the prior change to buffer
all files in memory when pulling, meaning there is now a phase where the
WAL file will grow significantly, at least for initial sync of folders
with many directories.

---------

Co-authored-by: bt90 <btom1990@googlemail.com>
This commit is contained in:
Jakob Borg
2025-04-06 05:30:43 -07:00
committed by GitHub
parent 7d51b1b620
commit cf1cf85ce6
32 changed files with 1696 additions and 1219 deletions

View File

@@ -36,13 +36,13 @@ type DB interface {
// required.
AllGlobalFiles(folder string) (iter.Seq[FileMetadata], func() error)
AllGlobalFilesPrefix(folder string, prefix string) (iter.Seq[FileMetadata], func() error)
AllLocalBlocksWithHash(hash []byte) (iter.Seq[BlockMapEntry], func() error)
AllLocalFiles(folder string, device protocol.DeviceID) (iter.Seq[protocol.FileInfo], func() error)
AllLocalFilesBySequence(folder string, device protocol.DeviceID, startSeq int64, limit int) (iter.Seq[protocol.FileInfo], func() error)
AllLocalFilesWithPrefix(folder string, device protocol.DeviceID, prefix string) (iter.Seq[protocol.FileInfo], func() error)
AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Seq[FileMetadata], func() error)
AllLocalFilesWithBlocksHashAnyFolder(h []byte) (iter.Seq2[string, FileMetadata], func() error)
AllNeededGlobalFiles(folder string, device protocol.DeviceID, order config.PullOrder, limit, offset int) (iter.Seq[protocol.FileInfo], func() error)
AllLocalBlocksWithHash(hash []byte) ([]BlockMapEntry, error)
AllLocalFilesWithBlocksHashAnyFolder(hash []byte) (map[string][]FileMetadata, error)
// Cleanup
DropAllFiles(folder string, device protocol.DeviceID) error

View File

@@ -67,9 +67,9 @@ func (m metricsDB) AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Se
return m.DB.AllLocalFilesWithBlocksHash(folder, h)
}
func (m metricsDB) AllLocalFilesWithBlocksHashAnyFolder(h []byte) (iter.Seq2[string, FileMetadata], func() error) {
func (m metricsDB) AllLocalFilesWithBlocksHashAnyFolder(hash []byte) (map[string][]FileMetadata, error) {
defer m.account("-", "AllLocalFilesWithBlocksHashAnyFolder")()
return m.DB.AllLocalFilesWithBlocksHashAnyFolder(h)
return m.DB.AllLocalFilesWithBlocksHashAnyFolder(hash)
}
func (m metricsDB) AllGlobalFiles(folder string) (iter.Seq[FileMetadata], func() error) {
@@ -107,7 +107,7 @@ func (m metricsDB) GetGlobalAvailability(folder, file string) ([]protocol.Device
return m.DB.GetGlobalAvailability(folder, file)
}
func (m metricsDB) AllLocalBlocksWithHash(hash []byte) (iter.Seq[BlockMapEntry], func() error) {
func (m metricsDB) AllLocalBlocksWithHash(hash []byte) ([]BlockMapEntry, error) {
defer m.account("-", "AllLocalBlocksWithHash")()
return m.DB.AllLocalBlocksWithHash(hash)
}

View File

@@ -0,0 +1,251 @@
// Copyright (C) 2025 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package sqlite
import (
"database/sql"
"embed"
"io/fs"
"path/filepath"
"strconv"
"strings"
"sync"
"text/template"
"time"
"github.com/jmoiron/sqlx"
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/protocol"
)
const currentSchemaVersion = 1
//go:embed sql/**
var embedded embed.FS
type baseDB struct {
path string
baseName string
sql *sqlx.DB
updateLock sync.Mutex
updatePoints int
checkpointsCount int
statementsMut sync.RWMutex
statements map[string]*sqlx.Stmt
tplInput map[string]any
}
func openBase(path string, maxConns int, pragmas, schemaScripts, migrationScripts []string) (*baseDB, error) {
// Open the database with options to enable foreign keys and recursive
// triggers (needed for the delete+insert triggers on row replace).
sqlDB, err := sqlx.Open(dbDriver, "file:"+path+"?"+commonOptions)
if err != nil {
return nil, wrap(err)
}
sqlDB.SetMaxOpenConns(maxConns)
for _, pragma := range pragmas {
if _, err := sqlDB.Exec("PRAGMA " + pragma); err != nil {
return nil, wrap(err, "PRAGMA "+pragma)
}
}
db := &baseDB{
path: path,
baseName: filepath.Base(path),
sql: sqlDB,
statements: make(map[string]*sqlx.Stmt),
}
for _, script := range schemaScripts {
if err := db.runScripts(script); err != nil {
return nil, wrap(err)
}
}
ver, _ := db.getAppliedSchemaVersion()
if ver.SchemaVersion > 0 {
filter := func(scr string) bool {
scr = filepath.Base(scr)
nstr, _, ok := strings.Cut(scr, "-")
if !ok {
return false
}
n, err := strconv.ParseInt(nstr, 10, 32)
if err != nil {
return false
}
return int(n) > ver.SchemaVersion
}
for _, script := range migrationScripts {
if err := db.runScripts(script, filter); err != nil {
return nil, wrap(err)
}
}
}
// Set the current schema version, if not already set
if err := db.setAppliedSchemaVersion(currentSchemaVersion); err != nil {
return nil, wrap(err)
}
db.tplInput = map[string]any{
"FlagLocalUnsupported": protocol.FlagLocalUnsupported,
"FlagLocalIgnored": protocol.FlagLocalIgnored,
"FlagLocalMustRescan": protocol.FlagLocalMustRescan,
"FlagLocalReceiveOnly": protocol.FlagLocalReceiveOnly,
"FlagLocalGlobal": protocol.FlagLocalGlobal,
"FlagLocalNeeded": protocol.FlagLocalNeeded,
"SyncthingVersion": build.LongVersion,
}
return db, nil
}
func (s *baseDB) Close() error {
s.updateLock.Lock()
s.statementsMut.Lock()
defer s.updateLock.Unlock()
defer s.statementsMut.Unlock()
for _, stmt := range s.statements {
stmt.Close()
}
return wrap(s.sql.Close())
}
var tplFuncs = template.FuncMap{
"or": func(vs ...int) int {
v := vs[0]
for _, ov := range vs[1:] {
v |= ov
}
return v
},
}
// stmt returns a prepared statement for the given SQL string, after
// applying local template expansions. The statement is cached.
func (s *baseDB) stmt(tpl string) stmt {
tpl = strings.TrimSpace(tpl)
// Fast concurrent lookup of cached statement
s.statementsMut.RLock()
stmt, ok := s.statements[tpl]
s.statementsMut.RUnlock()
if ok {
return stmt
}
// On miss, take the full lock, check again
s.statementsMut.Lock()
defer s.statementsMut.Unlock()
stmt, ok = s.statements[tpl]
if ok {
return stmt
}
// Apply template expansions
var sb strings.Builder
compTpl := template.Must(template.New("tpl").Funcs(tplFuncs).Parse(tpl))
if err := compTpl.Execute(&sb, s.tplInput); err != nil {
panic("bug: bad template: " + err.Error())
}
// Prepare and cache
stmt, err := s.sql.Preparex(sb.String())
if err != nil {
return failedStmt{err}
}
s.statements[tpl] = stmt
return stmt
}
type stmt interface {
Exec(args ...any) (sql.Result, error)
Get(dest any, args ...any) error
Queryx(args ...any) (*sqlx.Rows, error)
Select(dest any, args ...any) error
}
type failedStmt struct {
err error
}
func (f failedStmt) Exec(_ ...any) (sql.Result, error) { return nil, f.err }
func (f failedStmt) Get(_ any, _ ...any) error { return f.err }
func (f failedStmt) Queryx(_ ...any) (*sqlx.Rows, error) { return nil, f.err }
func (f failedStmt) Select(_ any, _ ...any) error { return f.err }
func (s *baseDB) runScripts(glob string, filter ...func(s string) bool) error {
scripts, err := fs.Glob(embedded, glob)
if err != nil {
return wrap(err)
}
tx, err := s.sql.Begin()
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
nextScript:
for _, scr := range scripts {
for _, fn := range filter {
if !fn(scr) {
l.Debugln(s.baseName, "skipping script", scr)
continue nextScript
}
}
l.Debugln(s.baseName, "executing script", scr)
bs, err := fs.ReadFile(embedded, scr)
if err != nil {
return wrap(err, scr)
}
// SQLite requires one statement per exec, so we split the init
// files on lines containing only a semicolon and execute them
// separately. We require it on a separate line because there are
// also statement-internal semicolons in the triggers.
for _, stmt := range strings.Split(string(bs), "\n;") {
if _, err := tx.Exec(stmt); err != nil {
return wrap(err, stmt)
}
}
}
return wrap(tx.Commit())
}
type schemaVersion struct {
SchemaVersion int
AppliedAt int64
SyncthingVersion string
}
func (s *schemaVersion) AppliedTime() time.Time {
return time.Unix(0, s.AppliedAt)
}
func (s *baseDB) setAppliedSchemaVersion(ver int) error {
_, err := s.stmt(`
INSERT OR IGNORE INTO schemamigrations (schema_version, applied_at, syncthing_version)
VALUES (?, ?, ?)
`).Exec(ver, time.Now().UnixNano(), build.LongVersion)
return wrap(err)
}
func (s *baseDB) getAppliedSchemaVersion() (schemaVersion, error) {
var v schemaVersion
err := s.stmt(`
SELECT schema_version as schemaversion, applied_at as appliedat, syncthing_version as syncthingversion FROM schemamigrations
ORDER BY schema_version DESC
LIMIT 1
`).Get(&v)
return v, wrap(err)
}

View File

@@ -1,88 +0,0 @@
// Copyright (C) 2025 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package sqlite
import (
"sync"
"time"
"github.com/jmoiron/sqlx"
"github.com/syncthing/syncthing/internal/db"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/thejerf/suture/v4"
)
type DB struct {
sql *sqlx.DB
localDeviceIdx int64
deleteRetention time.Duration
updateLock sync.Mutex
updatePoints int
statementsMut sync.RWMutex
statements map[string]*sqlx.Stmt
tplInput map[string]any
}
var _ db.DB = (*DB)(nil)
type Option func(*DB)
func WithDeleteRetention(d time.Duration) Option {
return func(s *DB) {
s.deleteRetention = d
}
}
func (s *DB) Close() error {
s.updateLock.Lock()
s.statementsMut.Lock()
defer s.updateLock.Unlock()
defer s.statementsMut.Unlock()
for _, stmt := range s.statements {
stmt.Close()
}
return wrap(s.sql.Close())
}
func (s *DB) Service(maintenanceInterval time.Duration) suture.Service {
return newService(s, maintenanceInterval)
}
func (s *DB) ListFolders() ([]string, error) {
var res []string
err := s.stmt(`
SELECT folder_id FROM folders
ORDER BY folder_id
`).Select(&res)
return res, wrap(err)
}
func (s *DB) ListDevicesForFolder(folder string) ([]protocol.DeviceID, error) {
var res []string
err := s.stmt(`
SELECT d.device_id FROM counts s
INNER JOIN folders o ON o.idx = s.folder_idx
INNER JOIN devices d ON d.idx = s.device_idx
WHERE o.folder_id = ? AND s.count > 0 AND s.device_idx != {{.LocalDeviceIdx}}
GROUP BY d.device_id
ORDER BY d.device_id
`).Select(&res, folder)
if err != nil {
return nil, wrap(err)
}
devs := make([]protocol.DeviceID, len(res))
for i, s := range res {
devs[i], err = protocol.DeviceIDFromString(s)
if err != nil {
return nil, wrap(err)
}
}
return devs, nil
}

View File

@@ -0,0 +1,391 @@
// Copyright (C) 2025 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package sqlite
import (
"database/sql"
"errors"
"fmt"
"iter"
"path/filepath"
"time"
"github.com/syncthing/syncthing/internal/db"
"github.com/syncthing/syncthing/internal/itererr"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/protocol"
)
var errNoSuchFolder = errors.New("no such folder")
func (s *DB) getFolderDB(folder string, create bool) (*folderDB, error) {
// Check for an already open database
s.folderDBsMut.RLock()
fdb, ok := s.folderDBs[folder]
s.folderDBsMut.RUnlock()
if ok {
return fdb, nil
}
// Check for an existing folder. If we're not supposed to create the
// folder, we don't move on if it doesn't already have an ID.
var idx int64
if err := s.stmt(`
SELECT idx FROM folders
WHERE folder_id = ?
`).Get(&idx, folder); err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, wrap(err)
}
if idx == 0 && !create {
return nil, errNoSuchFolder
}
// Create a folder ID and database if it does not already exist
s.folderDBsMut.Lock()
defer s.folderDBsMut.Unlock()
if fdb, ok := s.folderDBs[folder]; ok {
return fdb, nil
}
var err error
if idx == 0 {
// First time we want to access this folder, need to create a new
// folder ID
idx, err = s.folderIdxLocked(folder)
if err != nil {
return nil, err
}
}
name := fmt.Sprintf("folder.%04x.db", idx)
path := filepath.Join(s.pathBase, name)
fdb, err = s.folderDBOpener(folder, path, s.deleteRetention)
if err != nil {
return nil, err
}
s.folderDBs[folder] = fdb
return fdb, nil
}
func (s *DB) Update(folder string, device protocol.DeviceID, fs []protocol.FileInfo) error {
fdb, err := s.getFolderDB(folder, true)
if err != nil {
return err
}
return fdb.Update(device, fs)
}
func (s *DB) GetDeviceFile(folder string, device protocol.DeviceID, file string) (protocol.FileInfo, bool, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return protocol.FileInfo{}, false, nil
}
if err != nil {
return protocol.FileInfo{}, false, err
}
return fdb.GetDeviceFile(device, file)
}
func (s *DB) GetGlobalAvailability(folder, file string) ([]protocol.DeviceID, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return nil, nil
}
if err != nil {
return nil, err
}
return fdb.GetGlobalAvailability(file)
}
func (s *DB) GetGlobalFile(folder string, file string) (protocol.FileInfo, bool, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return protocol.FileInfo{}, false, nil
}
if err != nil {
return protocol.FileInfo{}, false, err
}
return fdb.GetGlobalFile(file)
}
func (s *DB) AllGlobalFiles(folder string) (iter.Seq[db.FileMetadata], func() error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return func(yield func(db.FileMetadata) bool) {}, func() error { return nil }
}
if err != nil {
return func(yield func(db.FileMetadata) bool) {}, func() error { return err }
}
return fdb.AllGlobalFiles()
}
func (s *DB) AllGlobalFilesPrefix(folder string, prefix string) (iter.Seq[db.FileMetadata], func() error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return func(yield func(db.FileMetadata) bool) {}, func() error { return nil }
}
if err != nil {
return func(yield func(db.FileMetadata) bool) {}, func() error { return err }
}
return fdb.AllGlobalFilesPrefix(prefix)
}
func (s *DB) AllLocalBlocksWithHash(hash []byte) ([]db.BlockMapEntry, error) {
var entries []db.BlockMapEntry
err := s.forEachFolder(func(fdb *folderDB) error {
es, err := itererr.Collect(fdb.AllLocalBlocksWithHash(hash))
entries = append(entries, es...)
return err
})
return entries, err
}
func (s *DB) AllLocalFilesWithBlocksHashAnyFolder(hash []byte) (map[string][]db.FileMetadata, error) {
res := make(map[string][]db.FileMetadata)
err := s.forEachFolder(func(fdb *folderDB) error {
files, err := itererr.Collect(fdb.AllLocalFilesWithBlocksHash(hash))
res[fdb.folderID] = files
return err
})
return res, err
}
func (s *DB) AllLocalFiles(folder string, device protocol.DeviceID) (iter.Seq[protocol.FileInfo], func() error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return func(yield func(protocol.FileInfo) bool) {}, func() error { return nil }
}
if err != nil {
return func(yield func(protocol.FileInfo) bool) {}, func() error { return err }
}
return fdb.AllLocalFiles(device)
}
func (s *DB) AllLocalFilesBySequence(folder string, device protocol.DeviceID, startSeq int64, limit int) (iter.Seq[protocol.FileInfo], func() error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return func(yield func(protocol.FileInfo) bool) {}, func() error { return nil }
}
if err != nil {
return func(yield func(protocol.FileInfo) bool) {}, func() error { return err }
}
return fdb.AllLocalFilesBySequence(device, startSeq, limit)
}
func (s *DB) AllLocalFilesWithPrefix(folder string, device protocol.DeviceID, prefix string) (iter.Seq[protocol.FileInfo], func() error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return func(yield func(protocol.FileInfo) bool) {}, func() error { return nil }
}
if err != nil {
return func(yield func(protocol.FileInfo) bool) {}, func() error { return err }
}
return fdb.AllLocalFilesWithPrefix(device, prefix)
}
func (s *DB) AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Seq[db.FileMetadata], func() error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return func(yield func(db.FileMetadata) bool) {}, func() error { return nil }
}
if err != nil {
return func(yield func(db.FileMetadata) bool) {}, func() error { return err }
}
return fdb.AllLocalFilesWithBlocksHash(h)
}
func (s *DB) AllNeededGlobalFiles(folder string, device protocol.DeviceID, order config.PullOrder, limit, offset int) (iter.Seq[protocol.FileInfo], func() error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return func(yield func(protocol.FileInfo) bool) {}, func() error { return nil }
}
if err != nil {
return func(yield func(protocol.FileInfo) bool) {}, func() error { return err }
}
return fdb.AllNeededGlobalFiles(device, order, limit, offset)
}
func (s *DB) DropAllFiles(folder string, device protocol.DeviceID) error {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return nil
}
if err != nil {
return err
}
return fdb.DropAllFiles(device)
}
func (s *DB) DropFilesNamed(folder string, device protocol.DeviceID, names []string) error {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return nil
}
if err != nil {
return err
}
return fdb.DropFilesNamed(device, names)
}
func (s *DB) ListDevicesForFolder(folder string) ([]protocol.DeviceID, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return nil, nil
}
if err != nil {
return nil, err
}
return fdb.ListDevicesForFolder()
}
func (s *DB) RemoteSequences(folder string) (map[protocol.DeviceID]int64, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return nil, nil
}
if err != nil {
return nil, err
}
return fdb.RemoteSequences()
}
func (s *DB) CountGlobal(folder string) (db.Counts, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return db.Counts{}, nil
}
if err != nil {
return db.Counts{}, err
}
return fdb.CountGlobal()
}
func (s *DB) CountLocal(folder string, device protocol.DeviceID) (db.Counts, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return db.Counts{}, nil
}
if err != nil {
return db.Counts{}, err
}
return fdb.CountLocal(device)
}
func (s *DB) CountNeed(folder string, device protocol.DeviceID) (db.Counts, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return db.Counts{}, nil
}
if err != nil {
return db.Counts{}, err
}
return fdb.CountNeed(device)
}
func (s *DB) CountReceiveOnlyChanged(folder string) (db.Counts, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return db.Counts{}, nil
}
if err != nil {
return db.Counts{}, err
}
return fdb.CountReceiveOnlyChanged()
}
func (s *DB) DropAllIndexIDs() error {
return s.forEachFolder(func(fdb *folderDB) error {
return fdb.DropAllIndexIDs()
})
}
func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.IndexID, error) {
fdb, err := s.getFolderDB(folder, true)
if err != nil {
return 0, err
}
return fdb.GetIndexID(device)
}
func (s *DB) SetIndexID(folder string, device protocol.DeviceID, id protocol.IndexID) error {
fdb, err := s.getFolderDB(folder, true)
if err != nil {
return err
}
return fdb.SetIndexID(device, id)
}
func (s *DB) GetDeviceSequence(folder string, device protocol.DeviceID) (int64, error) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return 0, nil
}
if err != nil {
return 0, err
}
return fdb.GetDeviceSequence(device)
}
func (s *DB) DeleteMtime(folder, name string) error {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return nil
}
if err != nil {
return err
}
return fdb.DeleteMtime(name)
}
func (s *DB) GetMtime(folder, name string) (ondisk, virtual time.Time) {
fdb, err := s.getFolderDB(folder, false)
if errors.Is(err, errNoSuchFolder) {
return time.Time{}, time.Time{}
}
if err != nil {
return time.Time{}, time.Time{}
}
return fdb.GetMtime(name)
}
func (s *DB) PutMtime(folder, name string, ondisk, virtual time.Time) error {
fdb, err := s.getFolderDB(folder, true)
if err != nil {
return err
}
return fdb.PutMtime(name, ondisk, virtual)
}
func (s *DB) DropDevice(device protocol.DeviceID) error {
return s.forEachFolder(func(fdb *folderDB) error {
return fdb.DropDevice(device)
})
}
// forEachFolder runs the function for each currently open folderDB,
// returning the first error that was encountered.
func (s *DB) forEachFolder(fn func(fdb *folderDB) error) error {
folders, err := s.ListFolders()
if err != nil {
return err
}
var firstError error
for _, folder := range folders {
fdb, err := s.getFolderDB(folder, false)
if err != nil {
if firstError == nil {
firstError = err
}
continue
}
if err := fn(fdb); err != nil && firstError == nil {
firstError = err
}
}
return firstError
}

View File

@@ -13,7 +13,7 @@ import (
"github.com/syncthing/syncthing/internal/db"
)
func (s *DB) GetKV(key string) ([]byte, error) {
func (s *baseDB) GetKV(key string) ([]byte, error) {
var val []byte
if err := s.stmt(`
SELECT value FROM kv
@@ -24,7 +24,7 @@ func (s *DB) GetKV(key string) ([]byte, error) {
return val, nil
}
func (s *DB) PutKV(key string, val []byte) error {
func (s *baseDB) PutKV(key string, val []byte) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
_, err := s.stmt(`
@@ -34,7 +34,7 @@ func (s *DB) PutKV(key string, val []byte) error {
return wrap(err)
}
func (s *DB) DeleteKV(key string) error {
func (s *baseDB) DeleteKV(key string) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
_, err := s.stmt(`
@@ -43,7 +43,7 @@ func (s *DB) DeleteKV(key string) error {
return wrap(err)
}
func (s *DB) PrefixKV(prefix string) (iter.Seq[db.KeyValue], func() error) {
func (s *baseDB) PrefixKV(prefix string) (iter.Seq[db.KeyValue], func() error) {
var rows *sqlx.Rows
var err error
if prefix == "" {

View File

@@ -9,8 +9,6 @@ package sqlite
import (
"testing"
"github.com/syncthing/syncthing/internal/db"
"github.com/syncthing/syncthing/internal/itererr"
"github.com/syncthing/syncthing/lib/protocol"
)
@@ -52,7 +50,7 @@ func TestBlocks(t *testing.T) {
// Search for blocks
vals, err := itererr.Collect(db.AllLocalBlocksWithHash([]byte{1, 2, 3}))
vals, err := db.AllLocalBlocksWithHash([]byte{1, 2, 3})
if err != nil {
t.Fatal(err)
}
@@ -66,27 +64,23 @@ func TestBlocks(t *testing.T) {
// Get FileInfos for those blocks
found := 0
it, errFn := db.AllLocalFilesWithBlocksHashAnyFolder(vals[0].BlocklistHash)
for folder, fileInfo := range it {
if folder != folderID {
t.Fatal("should be same folder")
}
if fileInfo.Name != "file1" {
t.Fatal("should be file1")
}
found++
}
if err := errFn(); err != nil {
res, err := db.AllLocalFilesWithBlocksHashAnyFolder(vals[0].BlocklistHash)
if err != nil {
t.Fatal(err)
}
if found != 1 {
if len(res) != 1 {
t.Fatal("should return one folder")
}
if len(res[folderID]) != 1 {
t.Fatal("should find one file")
}
if res[folderID][0].Name != "file1" {
t.Fatal("should be file1")
}
// Get the other blocks
vals, err = itererr.Collect(db.AllLocalBlocksWithHash([]byte{3, 4, 5}))
vals, err = db.AllLocalBlocksWithHash([]byte{3, 4, 5})
if err != nil {
t.Fatal(err)
}
@@ -125,7 +119,10 @@ func TestBlocksDeleted(t *testing.T) {
// We should find one entry for the block hash
search := file.Blocks[0].Hash
es := mustCollect[db.BlockMapEntry](t)(sdb.AllLocalBlocksWithHash(search))
es, err := sdb.AllLocalBlocksWithHash(search)
if err != nil {
t.Fatal(err)
}
if len(es) != 1 {
t.Fatal("expected one hit")
}
@@ -137,13 +134,17 @@ func TestBlocksDeleted(t *testing.T) {
}
// Searching for the old hash should yield no hits
if hits := mustCollect[db.BlockMapEntry](t)(sdb.AllLocalBlocksWithHash(search)); len(hits) != 0 {
if hits, err := sdb.AllLocalBlocksWithHash(search); err != nil {
t.Fatal(err)
} else if len(hits) != 0 {
t.Log(hits)
t.Error("expected no hits")
}
// Searching for the new hash should yield one hits
if hits := mustCollect[db.BlockMapEntry](t)(sdb.AllLocalBlocksWithHash(file.Blocks[0].Hash)); len(hits) != 1 {
if hits, err := sdb.AllLocalBlocksWithHash(file.Blocks[0].Hash); err != nil {
t.Fatal(err)
} else if len(hits) != 1 {
t.Log(hits)
t.Error("expected one hit")
}

View File

@@ -1,54 +0,0 @@
// Copyright (C) 2025 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package sqlite
import (
"time"
)
func (s *DB) GetMtime(folder, name string) (ondisk, virtual time.Time) {
var res struct {
Ondisk int64
Virtual int64
}
if err := s.stmt(`
SELECT m.ondisk, m.virtual FROM mtimes m
INNER JOIN folders o ON o.idx = m.folder_idx
WHERE o.folder_id = ? AND m.name = ?
`).Get(&res, folder, name); err != nil {
return time.Time{}, time.Time{}
}
return time.Unix(0, res.Ondisk), time.Unix(0, res.Virtual)
}
func (s *DB) PutMtime(folder, name string, ondisk, virtual time.Time) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
folderIdx, err := s.folderIdxLocked(folder)
if err != nil {
return wrap(err)
}
_, err = s.stmt(`
INSERT OR REPLACE INTO mtimes (folder_idx, name, ondisk, virtual)
VALUES (?, ?, ?, ?)
`).Exec(folderIdx, name, ondisk.UnixNano(), virtual.UnixNano())
return wrap(err)
}
func (s *DB) DeleteMtime(folder, name string) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
folderIdx, err := s.folderIdxLocked(folder)
if err != nil {
return wrap(err)
}
_, err = s.stmt(`
DELETE FROM mtimes
WHERE folder_idx = ? AND name = ?
`).Exec(folderIdx, name)
return wrap(err)
}

View File

@@ -7,57 +7,104 @@
package sqlite
import (
"database/sql"
"os"
"path/filepath"
"strconv"
"strings"
"text/template"
"sync"
"time"
"github.com/jmoiron/sqlx"
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/internal/db"
)
const maxDBConns = 128
const maxDBConns = 16
type DB struct {
pathBase string
deleteRetention time.Duration
*baseDB
folderDBsMut sync.RWMutex
folderDBs map[string]*folderDB
folderDBOpener func(folder, path string, deleteRetention time.Duration) (*folderDB, error)
}
var _ db.DB = (*DB)(nil)
type Option func(*DB)
func WithDeleteRetention(d time.Duration) Option {
return func(s *DB) {
s.deleteRetention = d
}
}
func Open(path string, opts ...Option) (*DB, error) {
// Open the database with options to enable foreign keys and recursive
// triggers (needed for the delete+insert triggers on row replace).
sqlDB, err := sqlx.Open(dbDriver, "file:"+path+"?"+commonOptions)
pragmas := []string{
"journal_mode = WAL",
"optimize = 0x10002",
"auto_vacuum = INCREMENTAL",
"default_temp_store = MEMORY",
"temp_store = MEMORY",
}
schemas := []string{
"sql/schema/common/*",
"sql/schema/main/*",
}
os.MkdirAll(path, 0o700)
mainPath := filepath.Join(path, "main.db")
mainBase, err := openBase(mainPath, maxDBConns, pragmas, schemas, nil)
if err != nil {
return nil, wrap(err)
return nil, err
}
sqlDB.SetMaxOpenConns(maxDBConns)
if _, err := sqlDB.Exec(`PRAGMA journal_mode = WAL`); err != nil {
return nil, wrap(err, "PRAGMA journal_mode")
db := &DB{
pathBase: path,
baseDB: mainBase,
folderDBs: make(map[string]*folderDB),
folderDBOpener: openFolderDB,
}
if _, err := sqlDB.Exec(`PRAGMA optimize = 0x10002`); err != nil {
// https://www.sqlite.org/pragma.html#pragma_optimize
return nil, wrap(err, "PRAGMA optimize")
}
return openCommon(sqlDB, opts...)
return db, nil
}
// Open the database with options suitable for the migration inserts. This
// is not a safe mode of operation for normal processing, use only for bulk
// inserts with a close afterwards.
func OpenForMigration(path string) (*DB, error) {
sqlDB, err := sqlx.Open(dbDriver, "file:"+path+"?"+commonOptions)
pragmas := []string{
"journal_mode = OFF",
"default_temp_store = MEMORY",
"temp_store = MEMORY",
"foreign_keys = 0",
"synchronous = 0",
"locking_mode = EXCLUSIVE",
}
schemas := []string{
"sql/schema/common/*",
"sql/schema/main/*",
}
os.MkdirAll(path, 0o700)
mainPath := filepath.Join(path, "main.db")
mainBase, err := openBase(mainPath, 1, pragmas, schemas, nil)
if err != nil {
return nil, wrap(err, "open")
return nil, err
}
sqlDB.SetMaxOpenConns(1)
if _, err := sqlDB.Exec(`PRAGMA foreign_keys = 0`); err != nil {
return nil, wrap(err, "PRAGMA foreign_keys")
db := &DB{
pathBase: path,
baseDB: mainBase,
folderDBs: make(map[string]*folderDB),
folderDBOpener: openFolderDBForMigration,
}
if _, err := sqlDB.Exec(`PRAGMA journal_mode = OFF`); err != nil {
return nil, wrap(err, "PRAGMA journal_mode")
}
if _, err := sqlDB.Exec(`PRAGMA synchronous = 0`); err != nil {
return nil, wrap(err, "PRAGMA synchronous")
}
return openCommon(sqlDB)
// // Touch device IDs that should always exist and have a low index
// // numbers, and will never change
// db.localDeviceIdx, _ = db.deviceIdxLocked(protocol.LocalDeviceID)
// db.tplInput["LocalDeviceIdx"] = db.localDeviceIdx
return db, nil
}
func OpenTemp() (*DB, error) {
@@ -73,134 +120,12 @@ func OpenTemp() (*DB, error) {
return Open(path)
}
func openCommon(sqlDB *sqlx.DB, opts ...Option) (*DB, error) {
if _, err := sqlDB.Exec(`PRAGMA auto_vacuum = INCREMENTAL`); err != nil {
return nil, wrap(err, "PRAGMA auto_vacuum")
func (s *DB) Close() error {
s.folderDBsMut.Lock()
defer s.folderDBsMut.Unlock()
for folder, fdb := range s.folderDBs {
fdb.Close()
delete(s.folderDBs, folder)
}
if _, err := sqlDB.Exec(`PRAGMA default_temp_store = MEMORY`); err != nil {
return nil, wrap(err, "PRAGMA default_temp_store")
}
if _, err := sqlDB.Exec(`PRAGMA temp_store = MEMORY`); err != nil {
return nil, wrap(err, "PRAGMA temp_store")
}
db := &DB{
sql: sqlDB,
deleteRetention: defaultDeleteRetention,
statements: make(map[string]*sqlx.Stmt),
}
for _, opt := range opts {
opt(db)
}
if db.deleteRetention > 0 && db.deleteRetention < minDeleteRetention {
db.deleteRetention = minDeleteRetention
}
if err := db.runScripts("sql/schema/*"); err != nil {
return nil, wrap(err)
}
ver, _ := db.getAppliedSchemaVersion()
if ver.SchemaVersion > 0 {
filter := func(scr string) bool {
scr = filepath.Base(scr)
nstr, _, ok := strings.Cut(scr, "-")
if !ok {
return false
}
n, err := strconv.ParseInt(nstr, 10, 32)
if err != nil {
return false
}
return int(n) > ver.SchemaVersion
}
if err := db.runScripts("sql/migrations/*", filter); err != nil {
return nil, wrap(err)
}
}
// Touch device IDs that should always exist and have a low index
// numbers, and will never change
db.localDeviceIdx, _ = db.deviceIdxLocked(protocol.LocalDeviceID)
// Set the current schema version, if not already set
if err := db.setAppliedSchemaVersion(currentSchemaVersion); err != nil {
return nil, wrap(err)
}
db.tplInput = map[string]any{
"FlagLocalUnsupported": protocol.FlagLocalUnsupported,
"FlagLocalIgnored": protocol.FlagLocalIgnored,
"FlagLocalMustRescan": protocol.FlagLocalMustRescan,
"FlagLocalReceiveOnly": protocol.FlagLocalReceiveOnly,
"FlagLocalGlobal": protocol.FlagLocalGlobal,
"FlagLocalNeeded": protocol.FlagLocalNeeded,
"LocalDeviceIdx": db.localDeviceIdx,
"SyncthingVersion": build.LongVersion,
}
return db, nil
return wrap(s.baseDB.Close())
}
var tplFuncs = template.FuncMap{
"or": func(vs ...int) int {
v := vs[0]
for _, ov := range vs[1:] {
v |= ov
}
return v
},
}
// stmt returns a prepared statement for the given SQL string, after
// applying local template expansions. The statement is cached.
func (s *DB) stmt(tpl string) stmt {
tpl = strings.TrimSpace(tpl)
// Fast concurrent lookup of cached statement
s.statementsMut.RLock()
stmt, ok := s.statements[tpl]
s.statementsMut.RUnlock()
if ok {
return stmt
}
// On miss, take the full lock, check again
s.statementsMut.Lock()
defer s.statementsMut.Unlock()
stmt, ok = s.statements[tpl]
if ok {
return stmt
}
// Apply template expansions
var sb strings.Builder
compTpl := template.Must(template.New("tpl").Funcs(tplFuncs).Parse(tpl))
if err := compTpl.Execute(&sb, s.tplInput); err != nil {
panic("bug: bad template: " + err.Error())
}
// Prepare and cache
stmt, err := s.sql.Preparex(sb.String())
if err != nil {
return failedStmt{err}
}
s.statements[tpl] = stmt
return stmt
}
type stmt interface {
Exec(args ...any) (sql.Result, error)
Get(dest any, args ...any) error
Queryx(args ...any) (*sqlx.Rows, error)
Select(dest any, args ...any) error
}
type failedStmt struct {
err error
}
func (f failedStmt) Exec(_ ...any) (sql.Result, error) { return nil, f.err }
func (f failedStmt) Get(_ any, _ ...any) error { return f.err }
func (f failedStmt) Queryx(_ ...any) (*sqlx.Rows, error) { return nil, f.err }
func (f failedStmt) Select(_ any, _ ...any) error { return f.err }

View File

@@ -1,88 +0,0 @@
// Copyright (C) 2025 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package sqlite
import (
"embed"
"io/fs"
"strings"
"time"
"github.com/syncthing/syncthing/lib/build"
)
const currentSchemaVersion = 1
//go:embed sql/**
var embedded embed.FS
func (s *DB) runScripts(glob string, filter ...func(s string) bool) error {
scripts, err := fs.Glob(embedded, glob)
if err != nil {
return wrap(err)
}
tx, err := s.sql.Begin()
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
nextScript:
for _, scr := range scripts {
for _, fn := range filter {
if !fn(scr) {
l.Debugln("Skipping script", scr)
continue nextScript
}
}
l.Debugln("Executing script", scr)
bs, err := fs.ReadFile(embedded, scr)
if err != nil {
return wrap(err, scr)
}
// SQLite requires one statement per exec, so we split the init
// files on lines containing only a semicolon and execute them
// separately. We require it on a separate line because there are
// also statement-internal semicolons in the triggers.
for _, stmt := range strings.Split(string(bs), "\n;") {
if _, err := tx.Exec(stmt); err != nil {
return wrap(err, stmt)
}
}
}
return wrap(tx.Commit())
}
type schemaVersion struct {
SchemaVersion int
AppliedAt int64
SyncthingVersion string
}
func (s *schemaVersion) AppliedTime() time.Time {
return time.Unix(0, s.AppliedAt)
}
func (s *DB) setAppliedSchemaVersion(ver int) error {
_, err := s.stmt(`
INSERT OR IGNORE INTO schemamigrations (schema_version, applied_at, syncthing_version)
VALUES (?, ?, ?)
`).Exec(ver, time.Now().UnixNano(), build.LongVersion)
return wrap(err)
}
func (s *DB) getAppliedSchemaVersion() (schemaVersion, error) {
var v schemaVersion
err := s.stmt(`
SELECT schema_version as schemaversion, applied_at as appliedat, syncthing_version as syncthingversion FROM schemamigrations
ORDER BY schema_version DESC
LIMIT 1
`).Get(&v)
return v, wrap(err)
}

View File

@@ -11,7 +11,9 @@ import (
"fmt"
"time"
"github.com/jmoiron/sqlx"
"github.com/syncthing/syncthing/internal/db"
"github.com/thejerf/suture/v4"
)
const (
@@ -21,6 +23,10 @@ const (
minDeleteRetention = 24 * time.Hour
)
func (s *DB) Service(maintenanceInterval time.Duration) suture.Service {
return newService(s, maintenanceInterval)
}
type Service struct {
sdb *DB
maintenanceInterval time.Duration
@@ -80,14 +86,25 @@ func (s *Service) periodic(ctx context.Context) error {
t1 := time.Now()
defer func() { l.Debugln("Periodic done in", time.Since(t1), "+", t1.Sub(t0)) }()
if err := s.garbageCollectOldDeletedLocked(); err != nil {
return wrap(err)
}
if err := s.garbageCollectBlocklistsAndBlocksLocked(ctx); err != nil {
return wrap(err)
}
tidy(ctx, s.sdb.sql)
conn, err := s.sdb.sql.Conn(ctx)
return wrap(s.sdb.forEachFolder(func(fdb *folderDB) error {
fdb.updateLock.Lock()
defer fdb.updateLock.Unlock()
if err := garbageCollectOldDeletedLocked(fdb); err != nil {
return wrap(err)
}
if err := garbageCollectBlocklistsAndBlocksLocked(ctx, fdb); err != nil {
return wrap(err)
}
tidy(ctx, fdb.sql)
return nil
}))
}
func tidy(ctx context.Context, db *sqlx.DB) error {
conn, err := db.Conn(ctx)
if err != nil {
return wrap(err)
}
@@ -95,35 +112,34 @@ func (s *Service) periodic(ctx context.Context) error {
_, _ = conn.ExecContext(ctx, `ANALYZE`)
_, _ = conn.ExecContext(ctx, `PRAGMA optimize`)
_, _ = conn.ExecContext(ctx, `PRAGMA incremental_vacuum`)
_, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 67108864`)
_, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 8388608`)
_, _ = conn.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`)
return nil
}
func (s *Service) garbageCollectOldDeletedLocked() error {
if s.sdb.deleteRetention <= 0 {
l.Debugln("Delete retention is infinite, skipping cleanup")
func garbageCollectOldDeletedLocked(fdb *folderDB) error {
if fdb.deleteRetention <= 0 {
l.Debugln(fdb.baseName, "delete retention is infinite, skipping cleanup")
return nil
}
// Remove deleted files that are marked as not needed (we have processed
// them) and they were deleted more than MaxDeletedFileAge ago.
l.Debugln("Forgetting deleted files older than", s.sdb.deleteRetention)
res, err := s.sdb.stmt(`
l.Debugln(fdb.baseName, "forgetting deleted files older than", fdb.deleteRetention)
res, err := fdb.stmt(`
DELETE FROM files
WHERE deleted AND modified < ? AND local_flags & {{.FlagLocalNeeded}} == 0
`).Exec(time.Now().Add(-s.sdb.deleteRetention).UnixNano())
`).Exec(time.Now().Add(-fdb.deleteRetention).UnixNano())
if err != nil {
return wrap(err)
}
if aff, err := res.RowsAffected(); err == nil {
l.Debugln("Removed old deleted file records:", aff)
l.Debugln(fdb.baseName, "removed old deleted file records:", aff)
}
return nil
}
func (s *Service) garbageCollectBlocklistsAndBlocksLocked(ctx context.Context) error {
func garbageCollectBlocklistsAndBlocksLocked(ctx context.Context, fdb *folderDB) error {
// Remove all blocklists not referred to by any files and, by extension,
// any blocks not referred to by a blocklist. This is an expensive
// operation when run normally, especially if there are a lot of blocks
@@ -134,7 +150,7 @@ func (s *Service) garbageCollectBlocklistsAndBlocksLocked(ctx context.Context) e
// an explicit connection and disabling foreign keys before starting the
// transaction. We make sure to clean up on the way out.
conn, err := s.sdb.sql.Connx(ctx)
conn, err := fdb.sql.Connx(ctx)
if err != nil {
return wrap(err)
}
@@ -161,7 +177,7 @@ func (s *Service) garbageCollectBlocklistsAndBlocksLocked(ctx context.Context) e
return wrap(err, "delete blocklists")
} else if shouldDebug() {
rows, err := res.RowsAffected()
l.Debugln("Blocklist GC:", rows, err)
l.Debugln(fdb.baseName, "blocklist GC:", rows, err)
}
if res, err := tx.ExecContext(ctx, `
@@ -172,7 +188,7 @@ func (s *Service) garbageCollectBlocklistsAndBlocksLocked(ctx context.Context) e
return wrap(err, "delete blocks")
} else if shouldDebug() {
rows, err := res.RowsAffected()
l.Debugln("Blocks GC:", rows, err)
l.Debugln(fdb.baseName, "blocks GC:", rows, err)
}
return wrap(tx.Commit())

View File

@@ -298,6 +298,7 @@ func TestBasics(t *testing.T) {
t.Fatal(err)
}
if len(folders) != 1 || folders[0] != folderID {
t.Log(folders)
t.Error("expected one folder")
}
})
@@ -1009,15 +1010,20 @@ func TestBlocklistGarbageCollection(t *testing.T) {
// There should exist three blockslists and six blocks
fdb, err := sdb.getFolderDB(folderID, false)
if err != nil {
t.Fatal(err)
}
var count int
if err := sdb.sql.Get(&count, `SELECT count(*) FROM blocklists`); err != nil {
if err := fdb.sql.Get(&count, `SELECT count(*) FROM blocklists`); err != nil {
t.Fatal(err)
}
if count != 3 {
t.Log(count)
t.Fatal("expected 3 blocklists")
}
if err := sdb.sql.Get(&count, `SELECT count(*) FROM blocks`); err != nil {
if err := fdb.sql.Get(&count, `SELECT count(*) FROM blocks`); err != nil {
t.Fatal(err)
}
if count != 6 {
@@ -1039,14 +1045,14 @@ func TestBlocklistGarbageCollection(t *testing.T) {
// There should exist two blockslists and four blocks
if err := sdb.sql.Get(&count, `SELECT count(*) FROM blocklists`); err != nil {
if err := fdb.sql.Get(&count, `SELECT count(*) FROM blocklists`); err != nil {
t.Fatal(err)
}
if count != 2 {
t.Log(count)
t.Error("expected 2 blocklists")
}
if err := sdb.sql.Get(&count, `SELECT count(*) FROM blocks`); err != nil {
if err := fdb.sql.Get(&count, `SELECT count(*) FROM blocks`); err != nil {
t.Fatal(err)
}
if count != 3 {
@@ -1078,7 +1084,7 @@ func TestInsertLargeFile(t *testing.T) {
// Verify all the blocks are here
for i, block := range files[0].Blocks {
bs, err := itererr.Collect(sdb.AllLocalBlocksWithHash(block.Hash))
bs, err := sdb.AllLocalBlocksWithHash(block.Hash)
if err != nil {
t.Fatal(err)
}

View File

@@ -7,477 +7,38 @@
package sqlite
import (
"cmp"
"context"
"fmt"
"os"
"runtime"
"slices"
"strings"
"github.com/jmoiron/sqlx"
"github.com/syncthing/syncthing/internal/gen/dbproto"
"github.com/syncthing/syncthing/internal/itererr"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sliceutil"
"google.golang.org/protobuf/proto"
)
const (
// Arbitrarily chosen values for checkpoint frequency....
updatePointsPerFile = 100
updatePointsPerBlock = 1
updatePointsThreshold = 250_000
)
func (s *DB) Update(folder string, device protocol.DeviceID, fs []protocol.FileInfo) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
folderIdx, err := s.folderIdxLocked(folder)
if err != nil {
return wrap(err)
}
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
//nolint:sqlclosecheck
insertFileStmt, err := txp.Preparex(`
INSERT OR REPLACE INTO files (folder_idx, device_idx, remote_sequence, name, type, modified, size, version, deleted, invalid, local_flags, blocklist_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
RETURNING sequence
`)
if err != nil {
return wrap(err, "prepare insert file")
}
//nolint:sqlclosecheck
insertFileInfoStmt, err := txp.Preparex(`
INSERT INTO fileinfos (sequence, fiprotobuf)
VALUES (?, ?)
`)
if err != nil {
return wrap(err, "prepare insert fileinfo")
}
//nolint:sqlclosecheck
insertBlockListStmt, err := txp.Preparex(`
INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf)
VALUES (?, ?)
`)
if err != nil {
return wrap(err, "prepare insert blocklist")
}
var prevRemoteSeq int64
for i, f := range fs {
f.Name = osutil.NormalizedFilename(f.Name)
var blockshash *[]byte
if len(f.Blocks) > 0 {
f.BlocksHash = protocol.BlocksHash(f.Blocks)
blockshash = &f.BlocksHash
} else {
f.BlocksHash = nil
}
if f.Type == protocol.FileInfoTypeDirectory {
f.Size = 128 // synthetic directory size
}
// Insert the file.
//
// If it is a remote file, set remote_sequence otherwise leave it at
// null. Returns the new local sequence.
var remoteSeq *int64
if device != protocol.LocalDeviceID {
if i > 0 && f.Sequence == prevRemoteSeq {
return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq)
}
prevRemoteSeq = f.Sequence
remoteSeq = &f.Sequence
}
var localSeq int64
if err := insertFileStmt.Get(&localSeq, folderIdx, deviceIdx, remoteSeq, f.Name, f.Type, f.ModTime().UnixNano(), f.Size, f.Version.String(), f.IsDeleted(), f.IsInvalid(), f.LocalFlags, blockshash); err != nil {
return wrap(err, "insert file")
}
if len(f.Blocks) > 0 {
// Indirect the block list
blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire)
bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks})
if err != nil {
return wrap(err, "marshal blocklist")
}
if _, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil {
return wrap(err, "insert blocklist")
}
if device == protocol.LocalDeviceID {
// Insert all blocks
if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil {
return wrap(err, "insert blocks")
}
}
f.Blocks = nil
}
// Insert the fileinfo
if device == protocol.LocalDeviceID {
f.Sequence = localSeq
}
bs, err := proto.Marshal(f.ToWire(true))
if err != nil {
return wrap(err, "marshal fileinfo")
}
if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil {
return wrap(err, "insert fileinfo")
}
// Update global and need
if err := s.recalcGlobalForFile(txp, folderIdx, f.Name); err != nil {
return wrap(err)
}
}
if err := tx.Commit(); err != nil {
return wrap(err)
}
s.periodicCheckpointLocked(fs)
return nil
}
func (s *DB) DropFolder(folder string) error {
s.folderDBsMut.Lock()
defer s.folderDBsMut.Unlock()
s.updateLock.Lock()
defer s.updateLock.Unlock()
_, err := s.stmt(`
DELETE FROM folders
WHERE folder_id = ?
`).Exec(folder)
if fdb, ok := s.folderDBs[folder]; ok {
fdb.Close()
_ = os.Remove(fdb.path)
_ = os.Remove(fdb.path + "-wal")
_ = os.Remove(fdb.path + "-shm")
delete(s.folderDBs, folder)
}
return wrap(err)
}
func (s *DB) DropDevice(device protocol.DeviceID) error {
if device == protocol.LocalDeviceID {
panic("bug: cannot drop local device")
}
s.updateLock.Lock()
defer s.updateLock.Unlock()
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
// Find all folders where the device is involved
var folderIdxs []int64
if err := tx.Select(&folderIdxs, `
SELECT folder_idx
FROM counts
WHERE device_idx = ? AND count > 0
GROUP BY folder_idx
`, deviceIdx); err != nil {
return wrap(err)
}
// Drop the device, which cascades to delete all files etc for it
if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil {
return wrap(err)
}
// Recalc the globals for all affected folders
for _, idx := range folderIdxs {
if err := s.recalcGlobalForFolder(txp, idx); err != nil {
return wrap(err)
}
}
return wrap(tx.Commit())
}
func (s *DB) DropAllFiles(folder string, device protocol.DeviceID) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
// This is a two part operation, first dropping all the files and then
// recalculating the global state for the entire folder.
folderIdx, err := s.folderIdxLocked(folder)
if err != nil {
return wrap(err)
}
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
// Drop all the file entries
result, err := tx.Exec(`
DELETE FROM files
WHERE folder_idx = ? AND device_idx = ?
`, folderIdx, deviceIdx)
if err != nil {
return wrap(err)
}
if n, err := result.RowsAffected(); err == nil && n == 0 {
// The delete affected no rows, so we don't need to redo the entire
// global/need calculation.
return wrap(tx.Commit())
}
// Recalc global for the entire folder
if err := s.recalcGlobalForFolder(txp, folderIdx); err != nil {
return wrap(err)
}
return wrap(tx.Commit())
}
func (s *DB) DropFilesNamed(folder string, device protocol.DeviceID, names []string) error {
for i := range names {
names[i] = osutil.NormalizedFilename(names[i])
}
s.updateLock.Lock()
defer s.updateLock.Unlock()
folderIdx, err := s.folderIdxLocked(folder)
if err != nil {
return wrap(err)
}
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
// Drop the named files
query, args, err := sqlx.In(`
DELETE FROM files
WHERE folder_idx = ? AND device_idx = ? AND name IN (?)
`, folderIdx, deviceIdx, names)
if err != nil {
return wrap(err)
}
if _, err := tx.Exec(query, args...); err != nil {
return wrap(err)
}
// Recalc globals for the named files
for _, name := range names {
if err := s.recalcGlobalForFile(txp, folderIdx, name); err != nil {
return wrap(err)
}
}
return wrap(tx.Commit())
}
func (*DB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error {
if len(blocks) == 0 {
return nil
}
bs := make([]map[string]any, len(blocks))
for i, b := range blocks {
bs[i] = map[string]any{
"hash": b.Hash,
"blocklist_hash": blocklistHash,
"idx": i,
"offset": b.Offset,
"size": b.Size,
}
}
// Very large block lists (>8000 blocks) result in "too many variables"
// error. Chunk it to a reasonable size.
for chunk := range slices.Chunk(bs, 1000) {
if _, err := tx.NamedExec(`
INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size)
VALUES (:hash, :blocklist_hash, :idx, :offset, :size)
`, chunk); err != nil {
return wrap(err)
}
}
return nil
}
func (s *DB) recalcGlobalForFolder(txp *txPreparedStmts, folderIdx int64) error {
// Select files where there is no global, those are the ones we need to
// recalculate.
//nolint:sqlclosecheck
namesStmt, err := txp.Preparex(`
SELECT f.name FROM files f
WHERE f.folder_idx = ? AND NOT EXISTS (
SELECT 1 FROM files g
WHERE g.folder_idx = ? AND g.name = f.name AND g.local_flags & ? != 0
)
GROUP BY name
`)
if err != nil {
return wrap(err)
}
rows, err := namesStmt.Queryx(folderIdx, folderIdx, protocol.FlagLocalGlobal)
if err != nil {
return wrap(err)
}
defer rows.Close()
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return wrap(err)
}
if err := s.recalcGlobalForFile(txp, folderIdx, name); err != nil {
return wrap(err)
}
}
return wrap(rows.Err())
}
func (s *DB) recalcGlobalForFile(txp *txPreparedStmts, folderIdx int64, file string) error {
//nolint:sqlclosecheck
selStmt, err := txp.Preparex(`
SELECT name, folder_idx, device_idx, sequence, modified, version, deleted, invalid, local_flags FROM files
WHERE folder_idx = ? AND name = ?
`)
if err != nil {
return wrap(err)
}
es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(folderIdx, file)))
if err != nil {
return wrap(err)
}
if len(es) == 0 {
// shouldn't happen
return nil
}
// Sort the entries; the global entry is at the head of the list
slices.SortFunc(es, fileRow.Compare)
// The global version is the first one in the list that is not invalid,
// or just the first one in the list if all are invalid.
var global fileRow
globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.Invalid })
if globIdx < 0 {
globIdx = 0
}
global = es[globIdx]
// We "have" the file if the position in the list of versions is at the
// global version or better, or if the version is the same as the global
// file (we might be further down the list due to invalid flags), or if
// the global is deleted and we don't have it at all...
localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx })
hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version
localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored
localIdx < 0 && global.Deleted // missing it, but the global is also deleted
// Set the global flag on the global entry. Set the need flag if the
// local device needs this file, unless it's invalid.
global.LocalFlags |= protocol.FlagLocalGlobal
if hasLocal || global.Invalid {
global.LocalFlags &= ^protocol.FlagLocalNeeded
} else {
global.LocalFlags |= protocol.FlagLocalNeeded
}
//nolint:sqlclosecheck
upStmt, err := txp.Prepare(`
UPDATE files SET local_flags = ?
WHERE folder_idx = ? AND device_idx = ? AND sequence = ?
`)
if err != nil {
return wrap(err)
}
if _, err := upStmt.Exec(global.LocalFlags, global.FolderIdx, global.DeviceIdx, global.Sequence); err != nil {
return wrap(err)
}
// Clear the need and global flags on all other entries
//nolint:sqlclosecheck
upStmt, err = txp.Prepare(`
UPDATE files SET local_flags = local_flags & ?
WHERE folder_idx = ? AND name = ? AND sequence != ? AND local_flags & ? != 0
`)
if err != nil {
return wrap(err)
}
if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), folderIdx, global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil {
return wrap(err)
}
return nil
}
func (s *DB) folderIdxLocked(folderID string) (int64, error) {
if _, err := s.stmt(`
INSERT OR IGNORE INTO folders(folder_id)
VALUES (?)
`).Exec(folderID); err != nil {
return 0, wrap(err)
}
var idx int64
if err := s.stmt(`
SELECT idx FROM folders
WHERE folder_id = ?
`).Get(&idx, folderID); err != nil {
return 0, wrap(err)
}
return idx, nil
}
func (s *DB) deviceIdxLocked(deviceID protocol.DeviceID) (int64, error) {
devStr := deviceID.String()
if _, err := s.stmt(`
INSERT OR IGNORE INTO devices(device_id)
VALUES (?)
`).Exec(devStr); err != nil {
return 0, wrap(err)
}
var idx int64
if err := s.stmt(`
SELECT idx FROM devices
WHERE device_id = ?
`).Get(&idx, devStr); err != nil {
return 0, wrap(err)
}
return idx, nil
func (s *DB) ListFolders() ([]string, error) {
var res []string
err := s.stmt(`
SELECT folder_id FROM folders
ORDER BY folder_id
`).Select(&res)
return res, wrap(err)
}
// wrap returns the error wrapped with the calling function name and
@@ -507,94 +68,3 @@ func wrap(err error, context ...string) error {
return fmt.Errorf("%s: %w", prefix, err)
}
type fileRow struct {
Name string
Version dbVector
FolderIdx int64 `db:"folder_idx"`
DeviceIdx int64 `db:"device_idx"`
Sequence int64
Modified int64
Size int64
LocalFlags int64 `db:"local_flags"`
Deleted bool
Invalid bool
}
func (e fileRow) Compare(other fileRow) int {
// From FileInfo.WinsConflict
vc := e.Version.Vector.Compare(other.Version.Vector)
switch vc {
case protocol.Equal:
if e.Invalid != other.Invalid {
if e.Invalid {
return 1
}
return -1
}
// Compare the device ID index, lower is better. This is only
// deterministic to the extent that LocalDeviceID will always be the
// lowest one, order between remote devices is random (and
// irrelevant).
return cmp.Compare(e.DeviceIdx, other.DeviceIdx)
case protocol.Greater: // we are newer
return -1
case protocol.Lesser: // we are older
return 1
case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict
if e.Invalid != other.Invalid {
if e.Invalid { // we are invalid, we lose
return 1
}
return -1 // they are invalid, we win
}
if e.Deleted != other.Deleted {
if e.Deleted { // we are deleted, we lose
return 1
}
return -1 // they are deleted, we win
}
if d := cmp.Compare(e.Modified, other.Modified); d != 0 {
return -d // positive d means we were newer, so we win (negative return)
}
if vc == protocol.ConcurrentGreater {
return -1 // we have a better device ID, we win
}
return 1 // they win
default:
return 0
}
}
func (s *DB) periodicCheckpointLocked(fs []protocol.FileInfo) {
// Induce periodic checkpoints. We add points for each file and block,
// and checkpoint when we've written more than a threshold of points.
// This ensures we do not go too long without a checkpoint, while also
// not doing it incessantly for every update.
s.updatePoints += updatePointsPerFile * len(fs)
for _, f := range fs {
s.updatePoints += len(f.Blocks) * updatePointsPerBlock
}
if s.updatePoints > updatePointsThreshold {
conn, err := s.sql.Conn(context.Background())
if err != nil {
l.Debugln("conn:", err)
return
}
defer conn.Close()
if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 67108864`); err != nil {
l.Debugln("PRAGMA journal_size_limit:", err)
}
row := conn.QueryRowContext(context.Background(), `PRAGMA wal_checkpoint(RESTART)`)
var res, modified, moved int
if row.Err() != nil {
l.Debugln("PRAGMA wal_checkpoint(RESTART):", err)
} else if err := row.Scan(&res, &modified, &moved); err != nil {
l.Debugln("PRAGMA wal_checkpoint(RESTART) (scan):", err)
} else {
l.Debugln("checkpoint at", s.updatePoints, "returned", res, modified, moved)
}
s.updatePoints = 0
}
}

View File

@@ -19,95 +19,89 @@ type countsRow struct {
LocalFlags int64 `db:"local_flags"`
}
func (s *DB) CountLocal(folder string, device protocol.DeviceID) (db.Counts, error) {
func (s *folderDB) CountLocal(device protocol.DeviceID) (db.Counts, error) {
var res []countsRow
if err := s.stmt(`
SELECT s.type, s.count, s.size, s.local_flags, s.deleted FROM counts s
INNER JOIN folders o ON o.idx = s.folder_idx
INNER JOIN devices d ON d.idx = s.device_idx
WHERE o.folder_id = ? AND d.device_id = ? AND s.local_flags & {{.FlagLocalIgnored}} = 0
`).Select(&res, folder, device.String()); err != nil {
WHERE d.device_id = ? AND s.local_flags & {{.FlagLocalIgnored}} = 0
`).Select(&res, device.String()); err != nil {
return db.Counts{}, wrap(err)
}
return summarizeCounts(res), nil
}
func (s *DB) CountNeed(folder string, device protocol.DeviceID) (db.Counts, error) {
func (s *folderDB) CountNeed(device protocol.DeviceID) (db.Counts, error) {
if device == protocol.LocalDeviceID {
return s.needSizeLocal(folder)
return s.needSizeLocal()
}
return s.needSizeRemote(folder, device)
return s.needSizeRemote(device)
}
func (s *DB) CountGlobal(folder string) (db.Counts, error) {
func (s *folderDB) CountGlobal() (db.Counts, error) {
// Exclude ignored and receive-only changed files from the global count
// (legacy expectation? it's a bit weird since those files can in fact
// be global and you can get them with GetGlobal etc.)
var res []countsRow
err := s.stmt(`
SELECT s.type, s.count, s.size, s.local_flags, s.deleted FROM counts s
INNER JOIN folders o ON o.idx = s.folder_idx
WHERE o.folder_id = ? AND s.local_flags & {{.FlagLocalGlobal}} != 0 AND s.local_flags & {{or .FlagLocalReceiveOnly .FlagLocalIgnored}} = 0
`).Select(&res, folder)
WHERE s.local_flags & {{.FlagLocalGlobal}} != 0 AND s.local_flags & {{or .FlagLocalReceiveOnly .FlagLocalIgnored}} = 0
`).Select(&res)
if err != nil {
return db.Counts{}, wrap(err)
}
return summarizeCounts(res), nil
}
func (s *DB) CountReceiveOnlyChanged(folder string) (db.Counts, error) {
func (s *folderDB) CountReceiveOnlyChanged() (db.Counts, error) {
var res []countsRow
err := s.stmt(`
SELECT s.type, s.count, s.size, s.local_flags, s.deleted FROM counts s
INNER JOIN folders o ON o.idx = s.folder_idx
WHERE o.folder_id = ? AND local_flags & {{.FlagLocalReceiveOnly}} != 0
`).Select(&res, folder)
WHERE local_flags & {{.FlagLocalReceiveOnly}} != 0
`).Select(&res)
if err != nil {
return db.Counts{}, wrap(err)
}
return summarizeCounts(res), nil
}
func (s *DB) needSizeLocal(folder string) (db.Counts, error) {
func (s *folderDB) needSizeLocal() (db.Counts, error) {
// The need size for the local device is the sum of entries with the
// need bit set.
var res []countsRow
err := s.stmt(`
SELECT s.type, s.count, s.size, s.local_flags, s.deleted FROM counts s
INNER JOIN folders o ON o.idx = s.folder_idx
WHERE o.folder_id = ? AND s.local_flags & {{.FlagLocalNeeded}} != 0
`).Select(&res, folder)
WHERE s.local_flags & {{.FlagLocalNeeded}} != 0
`).Select(&res)
if err != nil {
return db.Counts{}, wrap(err)
}
return summarizeCounts(res), nil
}
func (s *DB) needSizeRemote(folder string, device protocol.DeviceID) (db.Counts, error) {
func (s *folderDB) needSizeRemote(device protocol.DeviceID) (db.Counts, error) {
var res []countsRow
// See neededGlobalFilesRemote for commentary as that is the same query without summing
if err := s.stmt(`
SELECT g.type, count(*) as count, sum(g.size) as size, g.local_flags, g.deleted FROM files g
INNER JOIN folders o ON o.idx = g.folder_idx
WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND NOT g.deleted AND NOT g.invalid AND NOT EXISTS (
WHERE g.local_flags & {{.FlagLocalGlobal}} != 0 AND NOT g.deleted AND NOT g.invalid AND NOT EXISTS (
SELECT 1 FROM FILES f
INNER JOIN devices d ON d.idx = f.device_idx
WHERE f.name = g.name AND f.version = g.version AND f.folder_idx = g.folder_idx AND d.device_id = ?
WHERE f.name = g.name AND f.version = g.version AND d.device_id = ?
)
GROUP BY g.type, g.local_flags, g.deleted
UNION ALL
SELECT g.type, count(*) as count, sum(g.size) as size, g.local_flags, g.deleted FROM files g
INNER JOIN folders o ON o.idx = g.folder_idx
WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND g.deleted AND NOT g.invalid AND EXISTS (
WHERE g.local_flags & {{.FlagLocalGlobal}} != 0 AND g.deleted AND NOT g.invalid AND EXISTS (
SELECT 1 FROM FILES f
INNER JOIN devices d ON d.idx = f.device_idx
WHERE f.name = g.name AND f.folder_idx = g.folder_idx AND d.device_id = ? AND NOT f.deleted
WHERE f.name = g.name AND d.device_id = ? AND NOT f.deleted
)
GROUP BY g.type, g.local_flags, g.deleted
`).Select(&res, folder, device.String(),
folder, device.String()); err != nil {
`).Select(&res, device.String(),
device.String()); err != nil {
return db.Counts{}, wrap(err)
}

View File

@@ -19,7 +19,7 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
)
func (s *DB) GetGlobalFile(folder string, file string) (protocol.FileInfo, bool, error) {
func (s *folderDB) GetGlobalFile(file string) (protocol.FileInfo, bool, error) {
file = osutil.NormalizedFilename(file)
var ind indirectFI
@@ -27,9 +27,8 @@ func (s *DB) GetGlobalFile(folder string, file string) (protocol.FileInfo, bool,
SELECT fi.fiprotobuf, bl.blprotobuf FROM fileinfos fi
INNER JOIN files f on fi.sequence = f.sequence
LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash
INNER JOIN folders o ON o.idx = f.folder_idx
WHERE o.folder_id = ? AND f.name = ? AND f.local_flags & {{.FlagLocalGlobal}} != 0
`).Get(&ind, folder, file)
WHERE f.name = ? AND f.local_flags & {{.FlagLocalGlobal}} != 0
`).Get(&ind, file)
if errors.Is(err, sql.ErrNoRows) {
return protocol.FileInfo{}, false, nil
}
@@ -43,18 +42,17 @@ func (s *DB) GetGlobalFile(folder string, file string) (protocol.FileInfo, bool,
return fi, true, nil
}
func (s *DB) GetGlobalAvailability(folder, file string) ([]protocol.DeviceID, error) {
func (s *folderDB) GetGlobalAvailability(file string) ([]protocol.DeviceID, error) {
file = osutil.NormalizedFilename(file)
var devStrs []string
err := s.stmt(`
SELECT d.device_id FROM files f
INNER JOIN devices d ON d.idx = f.device_idx
INNER JOIN folders o ON o.idx = f.folder_idx
INNER JOIN files g ON f.folder_idx = g.folder_idx AND g.version = f.version AND g.name = f.name
WHERE o.folder_id = ? AND g.name = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND f.device_idx != {{.LocalDeviceIdx}}
INNER JOIN files g ON g.version = f.version AND g.name = f.name
WHERE g.name = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND f.device_idx != {{.LocalDeviceIdx}}
ORDER BY d.device_id
`).Select(&devStrs, folder, file)
`).Select(&devStrs, file)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
@@ -74,22 +72,21 @@ func (s *DB) GetGlobalAvailability(folder, file string) ([]protocol.DeviceID, er
return devs, nil
}
func (s *DB) AllGlobalFiles(folder string) (iter.Seq[db.FileMetadata], func() error) {
func (s *folderDB) AllGlobalFiles() (iter.Seq[db.FileMetadata], func() error) {
it, errFn := iterStructs[db.FileMetadata](s.stmt(`
SELECT f.sequence, f.name, f.type, f.modified as modnanos, f.size, f.deleted, f.invalid, f.local_flags as localflags FROM files f
INNER JOIN folders o ON o.idx = f.folder_idx
WHERE o.folder_id = ? AND f.local_flags & {{.FlagLocalGlobal}} != 0
WHERE f.local_flags & {{.FlagLocalGlobal}} != 0
ORDER BY f.name
`).Queryx(folder))
`).Queryx())
return itererr.Map(it, errFn, func(m db.FileMetadata) (db.FileMetadata, error) {
m.Name = osutil.NativeFilename(m.Name)
return m, nil
})
}
func (s *DB) AllGlobalFilesPrefix(folder string, prefix string) (iter.Seq[db.FileMetadata], func() error) {
func (s *folderDB) AllGlobalFilesPrefix(prefix string) (iter.Seq[db.FileMetadata], func() error) {
if prefix == "" {
return s.AllGlobalFiles(folder)
return s.AllGlobalFiles()
}
prefix = osutil.NormalizedFilename(prefix)
@@ -97,17 +94,16 @@ func (s *DB) AllGlobalFilesPrefix(folder string, prefix string) (iter.Seq[db.Fil
it, errFn := iterStructs[db.FileMetadata](s.stmt(`
SELECT f.sequence, f.name, f.type, f.modified as modnanos, f.size, f.deleted, f.invalid, f.local_flags as localflags FROM files f
INNER JOIN folders o ON o.idx = f.folder_idx
WHERE o.folder_id = ? AND f.name >= ? AND f.name < ? AND f.local_flags & {{.FlagLocalGlobal}} != 0
WHERE f.name >= ? AND f.name < ? AND f.local_flags & {{.FlagLocalGlobal}} != 0
ORDER BY f.name
`).Queryx(folder, prefix, end))
`).Queryx(prefix, end))
return itererr.Map(it, errFn, func(m db.FileMetadata) (db.FileMetadata, error) {
m.Name = osutil.NativeFilename(m.Name)
return m, nil
})
}
func (s *DB) AllNeededGlobalFiles(folder string, device protocol.DeviceID, order config.PullOrder, limit, offset int) (iter.Seq[protocol.FileInfo], func() error) {
func (s *folderDB) AllNeededGlobalFiles(device protocol.DeviceID, order config.PullOrder, limit, offset int) (iter.Seq[protocol.FileInfo], func() error) {
var selectOpts string
switch order {
case config.PullOrderRandom:
@@ -132,25 +128,24 @@ func (s *DB) AllNeededGlobalFiles(folder string, device protocol.DeviceID, order
}
if device == protocol.LocalDeviceID {
return s.neededGlobalFilesLocal(folder, selectOpts)
return s.neededGlobalFilesLocal(selectOpts)
}
return s.neededGlobalFilesRemote(folder, device, selectOpts)
return s.neededGlobalFilesRemote(device, selectOpts)
}
func (s *DB) neededGlobalFilesLocal(folder, selectOpts string) (iter.Seq[protocol.FileInfo], func() error) {
func (s *folderDB) neededGlobalFilesLocal(selectOpts string) (iter.Seq[protocol.FileInfo], func() error) {
// Select all the non-ignored files with the need bit set.
it, errFn := iterStructs[indirectFI](s.stmt(`
SELECT fi.fiprotobuf, bl.blprotobuf, g.name, g.size, g.modified FROM fileinfos fi
INNER JOIN files g on fi.sequence = g.sequence
LEFT JOIN blocklists bl ON bl.blocklist_hash = g.blocklist_hash
INNER JOIN folders o ON o.idx = g.folder_idx
WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalIgnored}} = 0 AND g.local_flags & {{.FlagLocalNeeded}} != 0
` + selectOpts).Queryx(folder))
WHERE g.local_flags & {{.FlagLocalIgnored}} = 0 AND g.local_flags & {{.FlagLocalNeeded}} != 0
` + selectOpts).Queryx())
return itererr.Map(it, errFn, indirectFI.FileInfo)
}
func (s *DB) neededGlobalFilesRemote(folder string, device protocol.DeviceID, selectOpts string) (iter.Seq[protocol.FileInfo], func() error) {
func (s *folderDB) neededGlobalFilesRemote(device protocol.DeviceID, selectOpts string) (iter.Seq[protocol.FileInfo], func() error) {
// Select:
//
// - all the valid, non-deleted global files that don't have a corresponding
@@ -163,11 +158,10 @@ func (s *DB) neededGlobalFilesRemote(folder string, device protocol.DeviceID, se
SELECT fi.fiprotobuf, bl.blprotobuf, g.name, g.size, g.modified FROM fileinfos fi
INNER JOIN files g on fi.sequence = g.sequence
LEFT JOIN blocklists bl ON bl.blocklist_hash = g.blocklist_hash
INNER JOIN folders o ON o.idx = g.folder_idx
WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND NOT g.deleted AND NOT g.invalid AND NOT EXISTS (
WHERE g.local_flags & {{.FlagLocalGlobal}} != 0 AND NOT g.deleted AND NOT g.invalid AND NOT EXISTS (
SELECT 1 FROM FILES f
INNER JOIN devices d ON d.idx = f.device_idx
WHERE f.name = g.name AND f.version = g.version AND f.folder_idx = g.folder_idx AND d.device_id = ?
WHERE f.name = g.name AND f.version = g.version AND d.device_id = ?
)
UNION ALL
@@ -175,15 +169,14 @@ func (s *DB) neededGlobalFilesRemote(folder string, device protocol.DeviceID, se
SELECT fi.fiprotobuf, bl.blprotobuf, g.name, g.size, g.modified FROM fileinfos fi
INNER JOIN files g on fi.sequence = g.sequence
LEFT JOIN blocklists bl ON bl.blocklist_hash = g.blocklist_hash
INNER JOIN folders o ON o.idx = g.folder_idx
WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND g.deleted AND NOT g.invalid AND EXISTS (
WHERE g.local_flags & {{.FlagLocalGlobal}} != 0 AND g.deleted AND NOT g.invalid AND EXISTS (
SELECT 1 FROM FILES f
INNER JOIN devices d ON d.idx = f.device_idx
WHERE f.name = g.name AND f.folder_idx = g.folder_idx AND d.device_id = ? AND NOT f.deleted
WHERE f.name = g.name AND d.device_id = ? AND NOT f.deleted
)
`+selectOpts).Queryx(
folder, device.String(),
folder, device.String(),
device.String(),
device.String(),
))
return itererr.Map(it, errFn, indirectFI.FileInfo)
}

View File

@@ -16,16 +16,15 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
)
func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.IndexID, error) {
func (s *folderDB) GetIndexID(device protocol.DeviceID) (protocol.IndexID, error) {
// Try a fast read-only query to begin with. If it does not find the ID
// we'll do the full thing under a lock.
var indexID string
if err := s.stmt(`
SELECT i.index_id FROM indexids i
INNER JOIN folders o ON o.idx = i.folder_idx
INNER JOIN devices d ON d.idx = i.device_idx
WHERE o.folder_id = ? AND d.device_id = ?
`).Get(&indexID, folder, device.String()); err == nil && indexID != "" {
WHERE d.device_id = ?
`).Get(&indexID, device.String()); err == nil && indexID != "" {
idx, err := indexIDFromHex(indexID)
return idx, wrap(err, "select")
}
@@ -40,14 +39,9 @@ func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.Index
// We are now operating only for the local device ID
folderIdx, err := s.folderIdxLocked(folder)
if err != nil {
return 0, wrap(err)
}
if err := s.stmt(`
SELECT index_id FROM indexids WHERE folder_idx = ? AND device_idx = {{.LocalDeviceIdx}}
`).Get(&indexID, folderIdx); err != nil && !errors.Is(err, sql.ErrNoRows) {
SELECT index_id FROM indexids WHERE device_idx = {{.LocalDeviceIdx}}
`).Get(&indexID); err != nil && !errors.Is(err, sql.ErrNoRows) {
return 0, wrap(err, "select local")
}
@@ -57,11 +51,11 @@ func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.Index
// any.
id := protocol.NewIndexID()
if _, err := s.stmt(`
INSERT INTO indexids (folder_idx, device_idx, index_id, sequence)
SELECT ?, {{.LocalDeviceIdx}}, ?, COALESCE(MAX(sequence), 0) FROM files
WHERE folder_idx = ? AND device_idx = {{.LocalDeviceIdx}}
INSERT INTO indexids (device_idx, index_id, sequence)
SELECT {{.LocalDeviceIdx}}, ?, COALESCE(MAX(sequence), 0) FROM files
WHERE device_idx = {{.LocalDeviceIdx}}
ON CONFLICT DO UPDATE SET index_id = ?
`).Exec(folderIdx, indexIDToHex(id), folderIdx, indexIDToHex(id)); err != nil {
`).Exec(indexIDToHex(id), indexIDToHex(id)); err != nil {
return 0, wrap(err, "insert")
}
return id, nil
@@ -70,42 +64,37 @@ func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.Index
return indexIDFromHex(indexID)
}
func (s *DB) SetIndexID(folder string, device protocol.DeviceID, id protocol.IndexID) error {
func (s *folderDB) SetIndexID(device protocol.DeviceID, id protocol.IndexID) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
folderIdx, err := s.folderIdxLocked(folder)
if err != nil {
return wrap(err, "folder idx")
}
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err, "device idx")
}
if _, err := s.stmt(`
INSERT OR REPLACE INTO indexids (folder_idx, device_idx, index_id, sequence) values (?, ?, ?, 0)
`).Exec(folderIdx, deviceIdx, indexIDToHex(id)); err != nil {
INSERT OR REPLACE INTO indexids (device_idx, index_id, sequence) values (?, ?, 0)
`).Exec(deviceIdx, indexIDToHex(id)); err != nil {
return wrap(err, "insert")
}
return nil
}
func (s *DB) DropAllIndexIDs() error {
func (s *folderDB) DropAllIndexIDs() error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
_, err := s.stmt(`DELETE FROM indexids`).Exec()
return wrap(err)
}
func (s *DB) GetDeviceSequence(folder string, device protocol.DeviceID) (int64, error) {
func (s *folderDB) GetDeviceSequence(device protocol.DeviceID) (int64, error) {
var res sql.NullInt64
err := s.stmt(`
SELECT sequence FROM indexids i
INNER JOIN folders o ON o.idx = i.folder_idx
INNER JOIN devices d ON d.idx = i.device_idx
WHERE o.folder_id = ? AND d.device_id = ?
`).Get(&res, folder, device.String())
WHERE d.device_id = ?
`).Get(&res, device.String())
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
@@ -118,7 +107,7 @@ func (s *DB) GetDeviceSequence(folder string, device protocol.DeviceID) (int64,
return res.Int64, nil
}
func (s *DB) RemoteSequences(folder string) (map[protocol.DeviceID]int64, error) {
func (s *folderDB) RemoteSequences() (map[protocol.DeviceID]int64, error) {
type row struct {
Device string
Seq int64
@@ -126,10 +115,9 @@ func (s *DB) RemoteSequences(folder string) (map[protocol.DeviceID]int64, error)
it, errFn := iterStructs[row](s.stmt(`
SELECT d.device_id AS device, i.sequence AS seq FROM indexids i
INNER JOIN folders o ON o.idx = i.folder_idx
INNER JOIN devices d ON d.idx = i.device_idx
WHERE o.folder_id = ? AND i.device_idx != {{.LocalDeviceIdx}}
`).Queryx(folder))
WHERE i.device_idx != {{.LocalDeviceIdx}}
`).Queryx())
res := make(map[protocol.DeviceID]int64)
for row, err := range itererr.Zip(it, errFn) {

View File

@@ -18,7 +18,7 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
)
func (s *DB) GetDeviceFile(folder string, device protocol.DeviceID, file string) (protocol.FileInfo, bool, error) {
func (s *folderDB) GetDeviceFile(device protocol.DeviceID, file string) (protocol.FileInfo, bool, error) {
file = osutil.NormalizedFilename(file)
var ind indirectFI
@@ -27,9 +27,8 @@ func (s *DB) GetDeviceFile(folder string, device protocol.DeviceID, file string)
INNER JOIN files f on fi.sequence = f.sequence
LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash
INNER JOIN devices d ON f.device_idx = d.idx
INNER JOIN folders o ON f.folder_idx = o.idx
WHERE o.folder_id = ? AND d.device_id = ? AND f.name = ?
`).Get(&ind, folder, device.String(), file)
WHERE d.device_id = ? AND f.name = ?
`).Get(&ind, device.String(), file)
if errors.Is(err, sql.ErrNoRows) {
return protocol.FileInfo{}, false, nil
}
@@ -43,19 +42,18 @@ func (s *DB) GetDeviceFile(folder string, device protocol.DeviceID, file string)
return fi, true, nil
}
func (s *DB) AllLocalFiles(folder string, device protocol.DeviceID) (iter.Seq[protocol.FileInfo], func() error) {
func (s *folderDB) AllLocalFiles(device protocol.DeviceID) (iter.Seq[protocol.FileInfo], func() error) {
it, errFn := iterStructs[indirectFI](s.stmt(`
SELECT fi.fiprotobuf, bl.blprotobuf FROM fileinfos fi
INNER JOIN files f on fi.sequence = f.sequence
LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash
INNER JOIN folders o ON o.idx = f.folder_idx
INNER JOIN devices d ON d.idx = f.device_idx
WHERE o.folder_id = ? AND d.device_id = ?
`).Queryx(folder, device.String()))
WHERE d.device_id = ?
`).Queryx(device.String()))
return itererr.Map(it, errFn, indirectFI.FileInfo)
}
func (s *DB) AllLocalFilesBySequence(folder string, device protocol.DeviceID, startSeq int64, limit int) (iter.Seq[protocol.FileInfo], func() error) {
func (s *folderDB) AllLocalFilesBySequence(device protocol.DeviceID, startSeq int64, limit int) (iter.Seq[protocol.FileInfo], func() error) {
var limitStr string
if limit > 0 {
limitStr = fmt.Sprintf(" LIMIT %d", limit)
@@ -64,17 +62,16 @@ func (s *DB) AllLocalFilesBySequence(folder string, device protocol.DeviceID, st
SELECT fi.fiprotobuf, bl.blprotobuf FROM fileinfos fi
INNER JOIN files f on fi.sequence = f.sequence
LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash
INNER JOIN folders o ON o.idx = f.folder_idx
INNER JOIN devices d ON d.idx = f.device_idx
WHERE o.folder_id = ? AND d.device_id = ? AND f.sequence >= ?
WHERE d.device_id = ? AND f.sequence >= ?
ORDER BY f.sequence`+limitStr).Queryx(
folder, device.String(), startSeq))
device.String(), startSeq))
return itererr.Map(it, errFn, indirectFI.FileInfo)
}
func (s *DB) AllLocalFilesWithPrefix(folder string, device protocol.DeviceID, prefix string) (iter.Seq[protocol.FileInfo], func() error) {
func (s *folderDB) AllLocalFilesWithPrefix(device protocol.DeviceID, prefix string) (iter.Seq[protocol.FileInfo], func() error) {
if prefix == "" {
return s.AllLocalFiles(folder, device)
return s.AllLocalFiles(device)
}
prefix = osutil.NormalizedFilename(prefix)
@@ -84,37 +81,20 @@ func (s *DB) AllLocalFilesWithPrefix(folder string, device protocol.DeviceID, pr
SELECT fi.fiprotobuf, bl.blprotobuf FROM fileinfos fi
INNER JOIN files f on fi.sequence = f.sequence
LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash
INNER JOIN folders o ON o.idx = f.folder_idx
INNER JOIN devices d ON d.idx = f.device_idx
WHERE o.folder_id = ? AND d.device_id = ? AND f.name >= ? AND f.name < ?
`, folder, device.String(), prefix, end))
WHERE d.device_id = ? AND f.name >= ? AND f.name < ?
`, device.String(), prefix, end))
return itererr.Map(it, errFn, indirectFI.FileInfo)
}
func (s *DB) AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Seq[db.FileMetadata], func() error) {
func (s *folderDB) AllLocalFilesWithBlocksHash(h []byte) (iter.Seq[db.FileMetadata], func() error) {
return iterStructs[db.FileMetadata](s.stmt(`
SELECT f.sequence, f.name, f.type, f.modified as modnanos, f.size, f.deleted, f.invalid, f.local_flags as localflags FROM files f
INNER JOIN folders o ON o.idx = f.folder_idx
WHERE o.folder_id = ? AND f.device_idx = {{.LocalDeviceIdx}} AND f.blocklist_hash = ?
`).Queryx(folder, h))
}
func (s *DB) AllLocalFilesWithBlocksHashAnyFolder(h []byte) (iter.Seq2[string, db.FileMetadata], func() error) {
type row struct {
FolderID string `db:"folder_id"`
db.FileMetadata
}
it, errFn := iterStructs[row](s.stmt(`
SELECT o.folder_id, f.sequence, f.name, f.type, f.modified as modnanos, f.size, f.deleted, f.invalid, f.local_flags as localflags FROM files f
INNER JOIN folders o ON o.idx = f.folder_idx
WHERE f.device_idx = {{.LocalDeviceIdx}} AND f.blocklist_hash = ?
`).Queryx(h))
return itererr.Map2(it, errFn, func(r row) (string, db.FileMetadata, error) {
return r.FolderID, r.FileMetadata, nil
})
}
func (s *DB) AllLocalBlocksWithHash(hash []byte) (iter.Seq[db.BlockMapEntry], func() error) {
func (s *folderDB) AllLocalBlocksWithHash(hash []byte) (iter.Seq[db.BlockMapEntry], func() error) {
// We involve the files table in this select because deletion of blocks
// & blocklists is deferred (garbage collected) while the files list is
// not. This filters out blocks that are in fact deleted.
@@ -124,3 +104,25 @@ func (s *DB) AllLocalBlocksWithHash(hash []byte) (iter.Seq[db.BlockMapEntry], fu
WHERE f.device_idx = {{.LocalDeviceIdx}} AND b.hash = ?
`).Queryx(hash))
}
func (s *folderDB) ListDevicesForFolder() ([]protocol.DeviceID, error) {
var res []string
err := s.stmt(`
SELECT DISTINCT d.device_id FROM counts s
INNER JOIN devices d ON d.idx = s.device_idx
WHERE s.count > 0 AND s.device_idx != {{.LocalDeviceIdx}}
ORDER BY d.device_id
`).Select(&res)
if err != nil {
return nil, wrap(err)
}
devs := make([]protocol.DeviceID, len(res))
for i, s := range res {
devs[i], err = protocol.DeviceIDFromString(s)
if err != nil {
return nil, wrap(err)
}
}
return devs, nil
}

View File

@@ -0,0 +1,45 @@
// Copyright (C) 2025 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package sqlite
import (
"time"
)
func (s *folderDB) GetMtime(name string) (ondisk, virtual time.Time) {
var res struct {
Ondisk int64
Virtual int64
}
if err := s.stmt(`
SELECT m.ondisk, m.virtual FROM mtimes m
WHERE m.name = ?
`).Get(&res, name); err != nil {
return time.Time{}, time.Time{}
}
return time.Unix(0, res.Ondisk), time.Unix(0, res.Virtual)
}
func (s *folderDB) PutMtime(name string, ondisk, virtual time.Time) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
_, err := s.stmt(`
INSERT OR REPLACE INTO mtimes (name, ondisk, virtual)
VALUES (?, ?, ?)
`).Exec(name, ondisk.UnixNano(), virtual.UnixNano())
return wrap(err)
}
func (s *folderDB) DeleteMtime(name string) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
_, err := s.stmt(`
DELETE FROM mtimes
WHERE name = ?
`).Exec(name)
return wrap(err)
}

View File

@@ -0,0 +1,110 @@
// Copyright (C) 2025 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package sqlite
import (
"time"
"github.com/syncthing/syncthing/lib/protocol"
)
type folderDB struct {
folderID string
*baseDB
localDeviceIdx int64
deleteRetention time.Duration
}
func openFolderDB(folder, path string, deleteRetention time.Duration) (*folderDB, error) {
pragmas := []string{
"journal_mode = WAL",
"optimize = 0x10002",
"auto_vacuum = INCREMENTAL",
"default_temp_store = MEMORY",
"temp_store = MEMORY",
}
schemas := []string{
"sql/schema/common/*",
"sql/schema/folder/*",
}
base, err := openBase(path, maxDBConns, pragmas, schemas, nil)
if err != nil {
return nil, err
}
fdb := &folderDB{
folderID: folder,
baseDB: base,
deleteRetention: deleteRetention,
}
_ = fdb.PutKV("folderID", []byte(folder))
// Touch device IDs that should always exist and have a low index
// numbers, and will never change
fdb.localDeviceIdx, _ = fdb.deviceIdxLocked(protocol.LocalDeviceID)
fdb.tplInput["LocalDeviceIdx"] = fdb.localDeviceIdx
return fdb, nil
}
// Open the database with options suitable for the migration inserts. This
// is not a safe mode of operation for normal processing, use only for bulk
// inserts with a close afterwards.
func openFolderDBForMigration(folder, path string, deleteRetention time.Duration) (*folderDB, error) {
pragmas := []string{
"journal_mode = OFF",
"default_temp_store = MEMORY",
"temp_store = MEMORY",
"foreign_keys = 0",
"synchronous = 0",
"locking_mode = EXCLUSIVE",
}
schemas := []string{
"sql/schema/common/*",
"sql/schema/folder/*",
}
base, err := openBase(path, 1, pragmas, schemas, nil)
if err != nil {
return nil, err
}
fdb := &folderDB{
folderID: folder,
baseDB: base,
deleteRetention: deleteRetention,
}
// Touch device IDs that should always exist and have a low index
// numbers, and will never change
fdb.localDeviceIdx, _ = fdb.deviceIdxLocked(protocol.LocalDeviceID)
fdb.tplInput["LocalDeviceIdx"] = fdb.localDeviceIdx
return fdb, nil
}
func (s *folderDB) deviceIdxLocked(deviceID protocol.DeviceID) (int64, error) {
devStr := deviceID.String()
if _, err := s.stmt(`
INSERT OR IGNORE INTO devices(device_id)
VALUES (?)
`).Exec(devStr); err != nil {
return 0, wrap(err)
}
var idx int64
if err := s.stmt(`
SELECT idx FROM devices
WHERE device_id = ?
`).Get(&idx, devStr); err != nil {
return 0, wrap(err)
}
return idx, nil
}

View File

@@ -0,0 +1,531 @@
// Copyright (C) 2025 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package sqlite
import (
"cmp"
"context"
"fmt"
"slices"
"github.com/jmoiron/sqlx"
"github.com/syncthing/syncthing/internal/gen/dbproto"
"github.com/syncthing/syncthing/internal/itererr"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sliceutil"
"google.golang.org/protobuf/proto"
)
const (
// Arbitrarily chosen values for checkpoint frequency....
updatePointsPerFile = 100
updatePointsPerBlock = 1
updatePointsThreshold = 250_000
)
func (s *folderDB) Update(device protocol.DeviceID, fs []protocol.FileInfo) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
//nolint:sqlclosecheck
insertFileStmt, err := txp.Preparex(`
INSERT OR REPLACE INTO files (device_idx, remote_sequence, name, type, modified, size, version, deleted, invalid, local_flags, blocklist_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
RETURNING sequence
`)
if err != nil {
return wrap(err, "prepare insert file")
}
//nolint:sqlclosecheck
insertFileInfoStmt, err := txp.Preparex(`
INSERT INTO fileinfos (sequence, fiprotobuf)
VALUES (?, ?)
`)
if err != nil {
return wrap(err, "prepare insert fileinfo")
}
//nolint:sqlclosecheck
insertBlockListStmt, err := txp.Preparex(`
INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf)
VALUES (?, ?)
`)
if err != nil {
return wrap(err, "prepare insert blocklist")
}
var prevRemoteSeq int64
for i, f := range fs {
f.Name = osutil.NormalizedFilename(f.Name)
var blockshash *[]byte
if len(f.Blocks) > 0 {
f.BlocksHash = protocol.BlocksHash(f.Blocks)
blockshash = &f.BlocksHash
} else {
f.BlocksHash = nil
}
if f.Type == protocol.FileInfoTypeDirectory {
f.Size = 128 // synthetic directory size
}
// Insert the file.
//
// If it is a remote file, set remote_sequence otherwise leave it at
// null. Returns the new local sequence.
var remoteSeq *int64
if device != protocol.LocalDeviceID {
if i > 0 && f.Sequence == prevRemoteSeq {
return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq)
}
prevRemoteSeq = f.Sequence
remoteSeq = &f.Sequence
}
var localSeq int64
if err := insertFileStmt.Get(&localSeq, deviceIdx, remoteSeq, f.Name, f.Type, f.ModTime().UnixNano(), f.Size, f.Version.String(), f.IsDeleted(), f.IsInvalid(), f.LocalFlags, blockshash); err != nil {
return wrap(err, "insert file")
}
if len(f.Blocks) > 0 {
// Indirect the block list
blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire)
bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks})
if err != nil {
return wrap(err, "marshal blocklist")
}
if _, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil {
return wrap(err, "insert blocklist")
}
if device == protocol.LocalDeviceID {
// Insert all blocks
if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil {
return wrap(err, "insert blocks")
}
}
f.Blocks = nil
}
// Insert the fileinfo
if device == protocol.LocalDeviceID {
f.Sequence = localSeq
}
bs, err := proto.Marshal(f.ToWire(true))
if err != nil {
return wrap(err, "marshal fileinfo")
}
if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil {
return wrap(err, "insert fileinfo")
}
// Update global and need
if err := s.recalcGlobalForFile(txp, f.Name); err != nil {
return wrap(err)
}
}
if err := tx.Commit(); err != nil {
return wrap(err)
}
s.periodicCheckpointLocked(fs)
return nil
}
func (s *folderDB) DropDevice(device protocol.DeviceID) error {
if device == protocol.LocalDeviceID {
panic("bug: cannot drop local device")
}
s.updateLock.Lock()
defer s.updateLock.Unlock()
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
// Drop the device, which cascades to delete all files etc for it
if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil {
return wrap(err)
}
// Recalc the globals for all affected folders
if err := s.recalcGlobalForFolder(txp); err != nil {
return wrap(err)
}
return wrap(tx.Commit())
}
func (s *folderDB) DropAllFiles(device protocol.DeviceID) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
// This is a two part operation, first dropping all the files and then
// recalculating the global state for the entire folder.
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
// Drop all the file entries
result, err := tx.Exec(`
DELETE FROM files
WHERE device_idx = ?
`, deviceIdx)
if err != nil {
return wrap(err)
}
if n, err := result.RowsAffected(); err == nil && n == 0 {
// The delete affected no rows, so we don't need to redo the entire
// global/need calculation.
return wrap(tx.Commit())
}
// Recalc global for the entire folder
if err := s.recalcGlobalForFolder(txp); err != nil {
return wrap(err)
}
return wrap(tx.Commit())
}
func (s *folderDB) DropFilesNamed(device protocol.DeviceID, names []string) error {
for i := range names {
names[i] = osutil.NormalizedFilename(names[i])
}
s.updateLock.Lock()
defer s.updateLock.Unlock()
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
// Drop the named files
query, args, err := sqlx.In(`
DELETE FROM files
WHERE device_idx = ? AND name IN (?)
`, deviceIdx, names)
if err != nil {
return wrap(err)
}
if _, err := tx.Exec(query, args...); err != nil {
return wrap(err)
}
// Recalc globals for the named files
for _, name := range names {
if err := s.recalcGlobalForFile(txp, name); err != nil {
return wrap(err)
}
}
return wrap(tx.Commit())
}
func (*folderDB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error {
if len(blocks) == 0 {
return nil
}
bs := make([]map[string]any, len(blocks))
for i, b := range blocks {
bs[i] = map[string]any{
"hash": b.Hash,
"blocklist_hash": blocklistHash,
"idx": i,
"offset": b.Offset,
"size": b.Size,
}
}
// Very large block lists (>8000 blocks) result in "too many variables"
// error. Chunk it to a reasonable size.
for chunk := range slices.Chunk(bs, 1000) {
if _, err := tx.NamedExec(`
INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size)
VALUES (:hash, :blocklist_hash, :idx, :offset, :size)
`, chunk); err != nil {
return wrap(err)
}
}
return nil
}
func (s *folderDB) recalcGlobalForFolder(txp *txPreparedStmts) error {
// Select files where there is no global, those are the ones we need to
// recalculate.
//nolint:sqlclosecheck
namesStmt, err := txp.Preparex(`
SELECT f.name FROM files f
WHERE NOT EXISTS (
SELECT 1 FROM files g
WHERE g.name = f.name AND g.local_flags & ? != 0
)
GROUP BY name
`)
if err != nil {
return wrap(err)
}
rows, err := namesStmt.Queryx(protocol.FlagLocalGlobal)
if err != nil {
return wrap(err)
}
defer rows.Close()
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return wrap(err)
}
if err := s.recalcGlobalForFile(txp, name); err != nil {
return wrap(err)
}
}
return wrap(rows.Err())
}
func (s *folderDB) recalcGlobalForFile(txp *txPreparedStmts, file string) error {
//nolint:sqlclosecheck
selStmt, err := txp.Preparex(`
SELECT name, device_idx, sequence, modified, version, deleted, invalid, local_flags FROM files
WHERE name = ?
`)
if err != nil {
return wrap(err)
}
es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(file)))
if err != nil {
return wrap(err)
}
if len(es) == 0 {
// shouldn't happen
return nil
}
// Sort the entries; the global entry is at the head of the list
slices.SortFunc(es, fileRow.Compare)
// The global version is the first one in the list that is not invalid,
// or just the first one in the list if all are invalid.
var global fileRow
globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.Invalid })
if globIdx < 0 {
globIdx = 0
}
global = es[globIdx]
// We "have" the file if the position in the list of versions is at the
// global version or better, or if the version is the same as the global
// file (we might be further down the list due to invalid flags), or if
// the global is deleted and we don't have it at all...
localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx })
hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version
localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored
localIdx < 0 && global.Deleted // missing it, but the global is also deleted
// Set the global flag on the global entry. Set the need flag if the
// local device needs this file, unless it's invalid.
global.LocalFlags |= protocol.FlagLocalGlobal
if hasLocal || global.Invalid {
global.LocalFlags &= ^protocol.FlagLocalNeeded
} else {
global.LocalFlags |= protocol.FlagLocalNeeded
}
//nolint:sqlclosecheck
upStmt, err := txp.Prepare(`
UPDATE files SET local_flags = ?
WHERE device_idx = ? AND sequence = ?
`)
if err != nil {
return wrap(err)
}
if _, err := upStmt.Exec(global.LocalFlags, global.DeviceIdx, global.Sequence); err != nil {
return wrap(err)
}
// Clear the need and global flags on all other entries
//nolint:sqlclosecheck
upStmt, err = txp.Prepare(`
UPDATE files SET local_flags = local_flags & ?
WHERE name = ? AND sequence != ? AND local_flags & ? != 0
`)
if err != nil {
return wrap(err)
}
if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil {
return wrap(err)
}
return nil
}
func (s *DB) folderIdxLocked(folderID string) (int64, error) {
if _, err := s.stmt(`
INSERT OR IGNORE INTO folders(folder_id)
VALUES (?)
`).Exec(folderID); err != nil {
return 0, wrap(err)
}
var idx int64
if err := s.stmt(`
SELECT idx FROM folders
WHERE folder_id = ?
`).Get(&idx, folderID); err != nil {
return 0, wrap(err)
}
return idx, nil
}
type fileRow struct {
Name string
Version dbVector
DeviceIdx int64 `db:"device_idx"`
Sequence int64
Modified int64
Size int64
LocalFlags int64 `db:"local_flags"`
Deleted bool
Invalid bool
}
func (e fileRow) Compare(other fileRow) int {
// From FileInfo.WinsConflict
vc := e.Version.Vector.Compare(other.Version.Vector)
switch vc {
case protocol.Equal:
if e.Invalid != other.Invalid {
if e.Invalid {
return 1
}
return -1
}
// Compare the device ID index, lower is better. This is only
// deterministic to the extent that LocalDeviceID will always be the
// lowest one, order between remote devices is random (and
// irrelevant).
return cmp.Compare(e.DeviceIdx, other.DeviceIdx)
case protocol.Greater: // we are newer
return -1
case protocol.Lesser: // we are older
return 1
case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict
if e.Invalid != other.Invalid {
if e.Invalid { // we are invalid, we lose
return 1
}
return -1 // they are invalid, we win
}
if e.Deleted != other.Deleted {
if e.Deleted { // we are deleted, we lose
return 1
}
return -1 // they are deleted, we win
}
if d := cmp.Compare(e.Modified, other.Modified); d != 0 {
return -d // positive d means we were newer, so we win (negative return)
}
if vc == protocol.ConcurrentGreater {
return -1 // we have a better device ID, we win
}
return 1 // they win
default:
return 0
}
}
func (s *folderDB) periodicCheckpointLocked(fs []protocol.FileInfo) {
// Induce periodic checkpoints. We add points for each file and block,
// and checkpoint when we've written more than a threshold of points.
// This ensures we do not go too long without a checkpoint, while also
// not doing it incessantly for every update.
s.updatePoints += updatePointsPerFile * len(fs)
for _, f := range fs {
s.updatePoints += len(f.Blocks) * updatePointsPerBlock
}
if s.updatePoints > updatePointsThreshold {
conn, err := s.sql.Conn(context.Background())
if err != nil {
l.Debugln(s.baseName, "conn:", err)
return
}
defer conn.Close()
if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 8388608`); err != nil {
l.Debugln(s.baseName, "PRAGMA journal_size_limit:", err)
}
// Every 50th checkpoint becomes a truncate, in an effort to bring
// down the size now and then.
checkpointType := "RESTART"
if s.checkpointsCount > 50 {
checkpointType = "TRUNCATE"
}
cmd := fmt.Sprintf(`PRAGMA wal_checkpoint(%s)`, checkpointType)
row := conn.QueryRowContext(context.Background(), cmd)
var res, modified, moved int
if row.Err() != nil {
l.Debugln(s.baseName, cmd+":", err)
} else if err := row.Scan(&res, &modified, &moved); err != nil {
l.Debugln(s.baseName, cmd+" (scan):", err)
} else {
l.Debugln(s.baseName, cmd, s.checkpointsCount, "at", s.updatePoints, "returned", res, modified, moved)
}
// Reset the truncate counter when a truncate succeeded. If it
// failed, we'll keep trying it until we succeed. Increase it faster
// when we fail to checkpoint, as it's more likely the WAL is
// growing and will need truncation when we get out of this state.
if res == 1 {
s.checkpointsCount += 10
} else if res == 0 && checkpointType == "TRUNCATE" {
s.checkpointsCount = 0
} else {
s.checkpointsCount++
}
s.updatePoints = 0
}
}

View File

@@ -4,16 +4,9 @@
-- License, v. 2.0. If a copy of the MPL was not distributed with this file,
-- You can obtain one at https://mozilla.org/MPL/2.0/.
-- folders map folder IDs as used by Syncthing to database folder indexes
CREATE TABLE IF NOT EXISTS folders (
idx INTEGER NOT NULL PRIMARY KEY,
folder_id TEXT NOT NULL UNIQUE COLLATE BINARY
) STRICT
;
-- devices map device IDs as used by Syncthing to database device indexes
CREATE TABLE IF NOT EXISTS devices (
idx INTEGER NOT NULL PRIMARY KEY,
idx INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL UNIQUE COLLATE BINARY
) STRICT
;

View File

@@ -22,7 +22,6 @@
-- Need bit. This allows for very efficient lookup of files needing handling
-- on this device, which is a common query.
CREATE TABLE IF NOT EXISTS files (
folder_idx INTEGER NOT NULL,
device_idx INTEGER NOT NULL, -- actual device ID or LocalDeviceID
sequence INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, -- our local database sequence, for each and every entry
remote_sequence INTEGER, -- remote device's sequence number, null for local or synthetic entries
@@ -35,8 +34,7 @@ CREATE TABLE IF NOT EXISTS files (
invalid INTEGER NOT NULL, -- boolean
local_flags INTEGER NOT NULL,
blocklist_hash BLOB, -- null when there are no blocks
FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE,
FOREIGN KEY(folder_idx) REFERENCES folders(idx) ON DELETE CASCADE
FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE
) STRICT
;
-- FileInfos store the actual protobuf object. We do this separately to keep
@@ -48,15 +46,15 @@ CREATE TABLE IF NOT EXISTS fileinfos (
) STRICT
;
-- There can be only one file per folder, device, and remote sequence number
CREATE UNIQUE INDEX IF NOT EXISTS files_remote_sequence ON files (folder_idx, device_idx, remote_sequence)
CREATE UNIQUE INDEX IF NOT EXISTS files_remote_sequence ON files (device_idx, remote_sequence)
WHERE remote_sequence IS NOT NULL
;
-- There can be only one file per folder, device, and name
CREATE UNIQUE INDEX IF NOT EXISTS files_device_name ON files (folder_idx, device_idx, name)
CREATE UNIQUE INDEX IF NOT EXISTS files_device_name ON files (device_idx, name)
;
-- We want to be able to look up & iterate files based on just folder and name
CREATE INDEX IF NOT EXISTS files_name_only ON files (folder_idx, name)
CREATE INDEX IF NOT EXISTS files_name_only ON files (name)
;
-- We want to be able to look up & iterate files based on blocks hash
CREATE INDEX IF NOT EXISTS files_blocklist_hash_only ON files (blocklist_hash, device_idx, folder_idx) WHERE blocklist_hash IS NOT NULL
CREATE INDEX IF NOT EXISTS files_blocklist_hash_only ON files (blocklist_hash, device_idx) WHERE blocklist_hash IS NOT NULL
;

View File

@@ -7,18 +7,16 @@
-- indexids holds the index ID and maximum sequence for a given device and folder
CREATE TABLE IF NOT EXISTS indexids (
device_idx INTEGER NOT NULL,
folder_idx INTEGER NOT NULL,
index_id TEXT NOT NULL COLLATE BINARY,
sequence INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY(device_idx, folder_idx),
FOREIGN KEY(folder_idx) REFERENCES folders(idx) ON DELETE CASCADE,
PRIMARY KEY(device_idx),
FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE
) STRICT, WITHOUT ROWID
;
CREATE TRIGGER IF NOT EXISTS indexids_seq AFTER INSERT ON files
BEGIN
INSERT INTO indexids (folder_idx, device_idx, index_id, sequence)
VALUES (NEW.folder_idx, NEW.device_idx, "", COALESCE(NEW.remote_sequence, NEW.sequence))
INSERT INTO indexids (device_idx, index_id, sequence)
VALUES (NEW.device_idx, "", COALESCE(NEW.remote_sequence, NEW.sequence))
ON CONFLICT DO UPDATE SET sequence = COALESCE(NEW.remote_sequence, NEW.sequence);
END
;

View File

@@ -9,16 +9,14 @@
-- Counts and sizes are maintained for each device, folder, type, flag bits
-- combination.
CREATE TABLE IF NOT EXISTS counts (
folder_idx INTEGER NOT NULL,
device_idx INTEGER NOT NULL,
type INTEGER NOT NULL,
local_flags INTEGER NOT NULL,
count INTEGER NOT NULL,
size INTEGER NOT NULL,
deleted INTEGER NOT NULL, -- boolean
PRIMARY KEY(folder_idx, device_idx, type, local_flags, deleted),
FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE,
FOREIGN KEY(folder_idx) REFERENCES folders(idx) ON DELETE CASCADE
PRIMARY KEY(device_idx, type, local_flags, deleted),
FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE
) STRICT, WITHOUT ROWID
;
@@ -26,28 +24,24 @@ CREATE TABLE IF NOT EXISTS counts (
CREATE TRIGGER IF NOT EXISTS counts_insert AFTER INSERT ON files
BEGIN
INSERT INTO counts (folder_idx, device_idx, type, local_flags, count, size, deleted)
VALUES (NEW.folder_idx, NEW.device_idx, NEW.type, NEW.local_flags, 1, NEW.size, NEW.deleted)
INSERT INTO counts (device_idx, type, local_flags, count, size, deleted)
VALUES (NEW.device_idx, NEW.type, NEW.local_flags, 1, NEW.size, NEW.deleted)
ON CONFLICT DO UPDATE SET count = count + 1, size = size + NEW.size;
END
;
CREATE TRIGGER IF NOT EXISTS counts_delete AFTER DELETE ON files
BEGIN
UPDATE counts SET count = count - 1, size = size - OLD.size
WHERE folder_idx = OLD.folder_idx AND device_idx = OLD.device_idx AND type = OLD.type AND local_flags = OLD.local_flags AND deleted = OLD.deleted;
WHERE device_idx = OLD.device_idx AND type = OLD.type AND local_flags = OLD.local_flags AND deleted = OLD.deleted;
END
;
CREATE TRIGGER IF NOT EXISTS counts_update AFTER UPDATE OF local_flags ON files
WHEN NEW.local_flags != OLD.local_flags
BEGIN
INSERT INTO counts (folder_idx, device_idx, type, local_flags, count, size, deleted)
VALUES (NEW.folder_idx, NEW.device_idx, NEW.type, NEW.local_flags, 1, NEW.size, NEW.deleted)
INSERT INTO counts (device_idx, type, local_flags, count, size, deleted)
VALUES (NEW.device_idx, NEW.type, NEW.local_flags, 1, NEW.size, NEW.deleted)
ON CONFLICT DO UPDATE SET count = count + 1, size = size + NEW.size;
UPDATE counts SET count = count - 1, size = size - OLD.size
WHERE folder_idx = OLD.folder_idx AND device_idx = OLD.device_idx AND type = OLD.type AND local_flags = OLD.local_flags AND deleted = OLD.deleted;
WHERE device_idx = OLD.device_idx AND type = OLD.type AND local_flags = OLD.local_flags AND deleted = OLD.deleted;
END
;
DROP TRIGGER IF EXISTS counts_update_add -- tmp migration
;
DROP TRIGGER IF EXISTS counts_update_del -- tmp migration
;

View File

@@ -6,11 +6,9 @@
--- Backing for the MtimeFS
CREATE TABLE IF NOT EXISTS mtimes (
folder_idx INTEGER NOT NULL,
name TEXT NOT NULL,
ondisk INTEGER NOT NULL, -- unix nanos
virtual INTEGER NOT NULL, -- unix nanos
PRIMARY KEY(folder_idx, name),
FOREIGN KEY(folder_idx) REFERENCES folders(idx) ON DELETE CASCADE
PRIMARY KEY(name)
) STRICT, WITHOUT ROWID
;

View File

@@ -0,0 +1,12 @@
-- Copyright (C) 2025 The Syncthing Authors.
--
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this file,
-- You can obtain one at https://mozilla.org/MPL/2.0/.
-- folders map folder IDs as used by Syncthing to database folder indexes
CREATE TABLE IF NOT EXISTS folders (
idx INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
folder_id TEXT NOT NULL UNIQUE COLLATE BINARY
) STRICT
;

View File

@@ -48,7 +48,7 @@ const (
UserHomeBaseDir BaseDirEnum = "userHome"
levelDBDir = "index-v0.14.0.db"
databaseName = "index-v2.db"
databaseName = "index-v2"
configFileName = "config.xml"
defaultStateDir = ".local/state/syncthing"
oldDefaultConfigDir = ".config/syncthing"

View File

@@ -315,23 +315,15 @@ func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyC
fileDeletions := map[string]protocol.FileInfo{}
buckets := map[string][]protocol.FileInfo{}
// Buffer the full list of needed files. This is somewhat wasteful and
// uses a lot of memory, but we need to keep the duration of the
// database read short and not do a bunch of file and data I/O inside
// the loop. If we forego the ability for users to repriorize the pull
// queue on the fly we could do this in batches, though that would also
// be a bit slower and less efficient in other ways.
files, err := itererr.Collect(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0))
if err != nil {
return changed, nil, nil, err
}
// Iterate the list of items that we need and sort them into piles.
// Regular files to pull goes into the file queue, everything else
// (directories, symlinks and deletes) goes into the "process directly"
// pile.
loop:
for _, file := range files {
for file, err := range itererr.Zip(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0)) {
if err != nil {
return changed, nil, nil, err
}
select {
case <-f.ctx.Done():
break loop
@@ -1353,58 +1345,58 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
buf = protocol.BufferPool.Upgrade(buf, int(block.Size))
found := false
for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(block.Hash)) {
blocks, _ := f.model.sdb.AllLocalBlocksWithHash(block.Hash)
for _, e := range blocks {
res, err := f.model.sdb.AllLocalFilesWithBlocksHashAnyFolder(e.BlocklistHash)
if err != nil {
break
continue
}
it, errFn := f.model.sdb.AllLocalFilesWithBlocksHashAnyFolder(e.BlocklistHash)
for folderID, fi := range it {
for folderID, files := range res {
ffs := folderFilesystems[folderID]
fd, err := ffs.Open(fi.Name)
if err != nil {
continue
}
defer fd.Close()
_, err = fd.ReadAt(buf, e.Offset)
if err != nil {
fd.Close()
continue
}
// Hash is not SHA256 as it's an encrypted hash token. In that
// case we can't verify the block integrity so we'll take it on
// trust. (The other side can and will verify.)
if f.Type != config.FolderTypeReceiveEncrypted {
if err := f.verifyBuffer(buf, block); err != nil {
l.Debugln("Finder failed to verify buffer", err)
for _, fi := range files {
fd, err := ffs.Open(fi.Name)
if err != nil {
continue
}
}
defer fd.Close()
if f.CopyRangeMethod != config.CopyRangeMethodStandard {
err = f.withLimiter(func() error {
dstFd.mut.Lock()
defer dstFd.mut.Unlock()
return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, e.Offset, block.Offset, int64(block.Size))
})
} else {
err = f.limitedWriteAt(dstFd, buf, block.Offset)
}
if err != nil {
state.fail(fmt.Errorf("dst write: %w", err))
_, err = fd.ReadAt(buf, e.Offset)
if err != nil {
fd.Close()
continue
}
// Hash is not SHA256 as it's an encrypted hash token. In that
// case we can't verify the block integrity so we'll take it on
// trust. (The other side can and will verify.)
if f.Type != config.FolderTypeReceiveEncrypted {
if err := f.verifyBuffer(buf, block); err != nil {
l.Debugln("Finder failed to verify buffer", err)
continue
}
}
if f.CopyRangeMethod != config.CopyRangeMethodStandard {
err = f.withLimiter(func() error {
dstFd.mut.Lock()
defer dstFd.mut.Unlock()
return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, e.Offset, block.Offset, int64(block.Size))
})
} else {
err = f.limitedWriteAt(dstFd, buf, block.Offset)
}
if err != nil {
state.fail(fmt.Errorf("dst write: %w", err))
break
}
if fi.Name == state.file.Name {
state.copiedFromOrigin(block.Size)
} else {
state.copiedFromElsewhere(block.Size)
}
found = true
break
}
if fi.Name == state.file.Name {
state.copiedFromOrigin(block.Size)
} else {
state.copiedFromElsewhere(block.Size)
}
found = true
break
}
if err := errFn(); err != nil {
l.Warnln(err)
}
}

View File

@@ -19,7 +19,6 @@ import (
"testing"
"time"
"github.com/syncthing/syncthing/internal/itererr"
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
@@ -326,11 +325,11 @@ func TestCopierCleanup(t *testing.T) {
// Update index (removing old blocks)
f.updateLocalsFromScanning([]protocol.FileInfo{file})
if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(blocks[0].Hash)); err != nil || len(vals) > 0 {
if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[0].Hash); err != nil || len(vals) > 0 {
t.Error("Unexpected block found")
}
if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(blocks[1].Hash)); err != nil || len(vals) == 0 {
if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[1].Hash); err != nil || len(vals) == 0 {
t.Error("Expected block not found")
}
@@ -339,11 +338,11 @@ func TestCopierCleanup(t *testing.T) {
// Update index (removing old blocks)
f.updateLocalsFromScanning([]protocol.FileInfo{file})
if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(blocks[0].Hash)); err != nil || len(vals) == 0 {
if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[0].Hash); err != nil || len(vals) == 0 {
t.Error("Unexpected block found")
}
if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(blocks[1].Hash)); err != nil || len(vals) > 0 {
if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[1].Hash); err != nil || len(vals) > 0 {
t.Error("Expected block not found")
}
}