mirror of
https://github.com/syncthing/syncthing.git
synced 2025-12-23 22:18:14 -05:00
This changes the files table to use normalisation for the names and
versions. The idea is that these are often common between all remote
devices, and repeating an integer is more efficient than repeating a
long string. A new benchmark bears this out; for a database with 100k
files shared between 31 devices, with some worst case assumption on
version vector size, the database is reduced in size by 50% and the test
finishes quicker:
Current:
db_bench_test.go:322: Total size: 6263.70 MiB
--- PASS: TestBenchmarkSizeManyFilesRemotes (1084.89s)
New:
db_bench_test.go:326: Total size: 3049.95 MiB
--- PASS: TestBenchmarkSizeManyFilesRemotes (776.97s)
The other benchmarks end up about the same within the margin of
variability, with one possible exception being that RemoteNeed seems to
be a little slower on average:
old files/s new files/s
Update/n=RemoteNeed/size=1000-8 5.051k 4.654k
Update/n=RemoteNeed/size=2000-8 5.201k 4.384k
Update/n=RemoteNeed/size=4000-8 4.943k 4.242k
Update/n=RemoteNeed/size=8000-8 5.099k 3.527k
Update/n=RemoteNeed/size=16000-8 3.686k 3.847k
Update/n=RemoteNeed/size=30000-8 4.456k 3.482k
I'm not sure why, possibly that query can be optimised anyhow.
Signed-off-by: Jakob Borg <jakob@kastelo.net>
570 lines
16 KiB
Go
570 lines
16 KiB
Go
// Copyright (C) 2025 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
package sqlite
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"slices"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/syncthing/syncthing/internal/gen/dbproto"
|
|
"github.com/syncthing/syncthing/internal/itererr"
|
|
"github.com/syncthing/syncthing/internal/slogutil"
|
|
"github.com/syncthing/syncthing/lib/osutil"
|
|
"github.com/syncthing/syncthing/lib/protocol"
|
|
"github.com/syncthing/syncthing/lib/sliceutil"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
const (
|
|
// Arbitrarily chosen values for checkpoint frequency....
|
|
updatePointsPerFile = 100
|
|
updatePointsPerBlock = 1
|
|
updatePointsThreshold = 250_000
|
|
)
|
|
|
|
func (s *folderDB) Update(device protocol.DeviceID, fs []protocol.FileInfo) error {
|
|
s.updateLock.Lock()
|
|
defer s.updateLock.Unlock()
|
|
|
|
deviceIdx, err := s.deviceIdxLocked(device)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
tx, err := s.sql.BeginTxx(context.Background(), nil)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
defer tx.Rollback() //nolint:errcheck
|
|
txp := &txPreparedStmts{Tx: tx}
|
|
|
|
//nolint:sqlclosecheck
|
|
insertNameStmt, err := txp.Preparex(`
|
|
INSERT INTO file_names(name)
|
|
VALUES (?)
|
|
ON CONFLICT(name) DO UPDATE
|
|
SET name = excluded.name
|
|
RETURNING idx
|
|
`)
|
|
if err != nil {
|
|
return wrap(err, "prepare insert name")
|
|
}
|
|
|
|
//nolint:sqlclosecheck
|
|
insertVersionStmt, err := txp.Preparex(`
|
|
INSERT INTO file_versions (version)
|
|
VALUES (?)
|
|
ON CONFLICT(version) DO UPDATE
|
|
SET version = excluded.version
|
|
RETURNING idx
|
|
`)
|
|
if err != nil {
|
|
return wrap(err, "prepare insert version")
|
|
}
|
|
|
|
//nolint:sqlclosecheck
|
|
insertFileStmt, err := txp.Preparex(`
|
|
INSERT OR REPLACE INTO files (device_idx, remote_sequence, type, modified, size, deleted, local_flags, blocklist_hash, name_idx, version_idx)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
RETURNING sequence
|
|
`)
|
|
if err != nil {
|
|
return wrap(err, "prepare insert file")
|
|
}
|
|
|
|
//nolint:sqlclosecheck
|
|
insertFileInfoStmt, err := txp.Preparex(`
|
|
INSERT INTO fileinfos (sequence, fiprotobuf)
|
|
VALUES (?, ?)
|
|
`)
|
|
if err != nil {
|
|
return wrap(err, "prepare insert fileinfo")
|
|
}
|
|
|
|
//nolint:sqlclosecheck
|
|
insertBlockListStmt, err := txp.Preparex(`
|
|
INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf)
|
|
VALUES (?, ?)
|
|
`)
|
|
if err != nil {
|
|
return wrap(err, "prepare insert blocklist")
|
|
}
|
|
|
|
var prevRemoteSeq int64
|
|
for i, f := range fs {
|
|
f.Name = osutil.NormalizedFilename(f.Name)
|
|
|
|
var blockshash *[]byte
|
|
if len(f.Blocks) > 0 {
|
|
f.BlocksHash = protocol.BlocksHash(f.Blocks)
|
|
blockshash = &f.BlocksHash
|
|
} else {
|
|
f.BlocksHash = nil
|
|
}
|
|
|
|
if f.Type == protocol.FileInfoTypeDirectory {
|
|
f.Size = 128 // synthetic directory size
|
|
}
|
|
|
|
// Insert the file.
|
|
//
|
|
// If it is a remote file, set remote_sequence otherwise leave it at
|
|
// null. Returns the new local sequence.
|
|
var remoteSeq *int64
|
|
if device != protocol.LocalDeviceID {
|
|
if i > 0 && f.Sequence == prevRemoteSeq {
|
|
return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq)
|
|
}
|
|
prevRemoteSeq = f.Sequence
|
|
remoteSeq = &f.Sequence
|
|
}
|
|
|
|
var nameIdx int64
|
|
if err := insertNameStmt.Get(&nameIdx, f.Name); err != nil {
|
|
return wrap(err, "insert name")
|
|
}
|
|
|
|
var versionIdx int64
|
|
if err := insertVersionStmt.Get(&versionIdx, f.Version.String()); err != nil {
|
|
return wrap(err, "insert version")
|
|
}
|
|
|
|
var localSeq int64
|
|
if err := insertFileStmt.Get(&localSeq, deviceIdx, remoteSeq, f.Type, f.ModTime().UnixNano(), f.Size, f.IsDeleted(), f.LocalFlags, blockshash, nameIdx, versionIdx); err != nil {
|
|
return wrap(err, "insert file")
|
|
}
|
|
|
|
if len(f.Blocks) > 0 {
|
|
// Indirect the block list
|
|
blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire)
|
|
bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks})
|
|
if err != nil {
|
|
return wrap(err, "marshal blocklist")
|
|
}
|
|
if res, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil {
|
|
return wrap(err, "insert blocklist")
|
|
} else if aff, _ := res.RowsAffected(); aff != 0 && device == protocol.LocalDeviceID {
|
|
// Insert all blocks
|
|
if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil {
|
|
return wrap(err, "insert blocks")
|
|
}
|
|
}
|
|
|
|
f.Blocks = nil
|
|
}
|
|
|
|
// Insert the fileinfo
|
|
if device == protocol.LocalDeviceID {
|
|
f.Sequence = localSeq
|
|
}
|
|
bs, err := proto.Marshal(f.ToWire(true))
|
|
if err != nil {
|
|
return wrap(err, "marshal fileinfo")
|
|
}
|
|
if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil {
|
|
return wrap(err, "insert fileinfo")
|
|
}
|
|
|
|
// Update global and need
|
|
if err := s.recalcGlobalForFile(txp, f.Name); err != nil {
|
|
return wrap(err)
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
s.periodicCheckpointLocked(fs)
|
|
return nil
|
|
}
|
|
|
|
func (s *folderDB) DropDevice(device protocol.DeviceID) error {
|
|
if device == protocol.LocalDeviceID {
|
|
panic("bug: cannot drop local device")
|
|
}
|
|
|
|
s.updateLock.Lock()
|
|
defer s.updateLock.Unlock()
|
|
|
|
tx, err := s.sql.BeginTxx(context.Background(), nil)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
defer tx.Rollback() //nolint:errcheck
|
|
txp := &txPreparedStmts{Tx: tx}
|
|
|
|
// Drop the device, which cascades to delete all files etc for it
|
|
if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
// Recalc the globals for all affected folders
|
|
if err := s.recalcGlobalForFolder(txp); err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
return wrap(tx.Commit())
|
|
}
|
|
|
|
func (s *folderDB) DropAllFiles(device protocol.DeviceID) error {
|
|
s.updateLock.Lock()
|
|
defer s.updateLock.Unlock()
|
|
|
|
// This is a two part operation, first dropping all the files and then
|
|
// recalculating the global state for the entire folder.
|
|
|
|
deviceIdx, err := s.deviceIdxLocked(device)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
tx, err := s.sql.BeginTxx(context.Background(), nil)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
defer tx.Rollback() //nolint:errcheck
|
|
txp := &txPreparedStmts{Tx: tx}
|
|
|
|
// Drop all the file entries
|
|
|
|
result, err := tx.Exec(`
|
|
DELETE FROM files
|
|
WHERE device_idx = ?
|
|
`, deviceIdx)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
if n, err := result.RowsAffected(); err == nil && n == 0 {
|
|
// The delete affected no rows, so we don't need to redo the entire
|
|
// global/need calculation.
|
|
return wrap(tx.Commit())
|
|
}
|
|
|
|
// Recalc global for the entire folder
|
|
|
|
if err := s.recalcGlobalForFolder(txp); err != nil {
|
|
return wrap(err)
|
|
}
|
|
return wrap(tx.Commit())
|
|
}
|
|
|
|
func (s *folderDB) DropFilesNamed(device protocol.DeviceID, names []string) error {
|
|
for i := range names {
|
|
names[i] = osutil.NormalizedFilename(names[i])
|
|
}
|
|
|
|
s.updateLock.Lock()
|
|
defer s.updateLock.Unlock()
|
|
|
|
deviceIdx, err := s.deviceIdxLocked(device)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
tx, err := s.sql.BeginTxx(context.Background(), nil)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
defer tx.Rollback() //nolint:errcheck
|
|
txp := &txPreparedStmts{Tx: tx}
|
|
|
|
// Drop the named files
|
|
|
|
query, args, err := sqlx.In(`
|
|
DELETE FROM files
|
|
WHERE device_idx = ? AND name_idx IN (
|
|
SELECT idx FROM file_names WHERE name IN (?)
|
|
)
|
|
`, deviceIdx, names)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
if _, err := tx.Exec(query, args...); err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
// Recalc globals for the named files
|
|
|
|
for _, name := range names {
|
|
if err := s.recalcGlobalForFile(txp, name); err != nil {
|
|
return wrap(err)
|
|
}
|
|
}
|
|
|
|
return wrap(tx.Commit())
|
|
}
|
|
|
|
func (*folderDB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error {
|
|
if len(blocks) == 0 {
|
|
return nil
|
|
}
|
|
bs := make([]map[string]any, len(blocks))
|
|
for i, b := range blocks {
|
|
bs[i] = map[string]any{
|
|
"hash": b.Hash,
|
|
"blocklist_hash": blocklistHash,
|
|
"idx": i,
|
|
"offset": b.Offset,
|
|
"size": b.Size,
|
|
}
|
|
}
|
|
|
|
// Very large block lists (>8000 blocks) result in "too many variables"
|
|
// error. Chunk it to a reasonable size.
|
|
for chunk := range slices.Chunk(bs, 1000) {
|
|
if _, err := tx.NamedExec(`
|
|
INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size)
|
|
VALUES (:hash, :blocklist_hash, :idx, :offset, :size)
|
|
`, chunk); err != nil {
|
|
return wrap(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *folderDB) recalcGlobalForFolder(txp *txPreparedStmts) error {
|
|
// Select files where there is no global, those are the ones we need to
|
|
// recalculate.
|
|
//nolint:sqlclosecheck
|
|
namesStmt, err := txp.Preparex(`
|
|
SELECT n.name FROM files f
|
|
INNER JOIN file_names n ON n.idx = f.name_idx
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM files g
|
|
WHERE g.name_idx = f.name_idx AND g.local_flags & ? != 0
|
|
)
|
|
GROUP BY n.name
|
|
`)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
rows, err := namesStmt.Queryx(protocol.FlagLocalGlobal)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var name string
|
|
if err := rows.Scan(&name); err != nil {
|
|
return wrap(err)
|
|
}
|
|
if err := s.recalcGlobalForFile(txp, name); err != nil {
|
|
return wrap(err)
|
|
}
|
|
}
|
|
return wrap(rows.Err())
|
|
}
|
|
|
|
func (s *folderDB) recalcGlobalForFile(txp *txPreparedStmts, file string) error {
|
|
//nolint:sqlclosecheck
|
|
selStmt, err := txp.Preparex(`
|
|
SELECT n.name, f.device_idx, f.sequence, f.modified, v.version, f.deleted, f.local_flags FROM files f
|
|
INNER JOIN file_versions v ON v.idx = f.version_idx
|
|
INNER JOIN file_names n ON n.idx = f.name_idx
|
|
WHERE n.name = ?
|
|
`)
|
|
if err != nil {
|
|
return wrap(err, "prepare select")
|
|
}
|
|
es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(file)))
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
if len(es) == 0 {
|
|
// shouldn't happen
|
|
return nil
|
|
}
|
|
|
|
// Sort the entries; the global entry is at the head of the list
|
|
slices.SortFunc(es, fileRow.Compare)
|
|
|
|
// The global version is the first one in the list that is not invalid,
|
|
// or just the first one in the list if all are invalid.
|
|
var global fileRow
|
|
globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.IsInvalid() })
|
|
if globIdx < 0 {
|
|
globIdx = 0
|
|
}
|
|
global = es[globIdx]
|
|
|
|
// We "have" the file if the position in the list of versions is at the
|
|
// global version or better, or if the version is the same as the global
|
|
// file (we might be further down the list due to invalid flags), or if
|
|
// the global is deleted and we don't have it at all...
|
|
localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx })
|
|
hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version
|
|
localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored
|
|
localIdx < 0 && global.Deleted // missing it, but the global is also deleted
|
|
|
|
// Set the global flag on the global entry. Set the need flag if the
|
|
// local device needs this file, unless it's invalid.
|
|
global.LocalFlags |= protocol.FlagLocalGlobal
|
|
if hasLocal || global.IsInvalid() {
|
|
global.LocalFlags &= ^protocol.FlagLocalNeeded
|
|
} else {
|
|
global.LocalFlags |= protocol.FlagLocalNeeded
|
|
}
|
|
//nolint:sqlclosecheck
|
|
upStmt, err := txp.Preparex(`
|
|
UPDATE files SET local_flags = ?
|
|
WHERE device_idx = ? AND sequence = ?
|
|
`)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
if _, err := upStmt.Exec(global.LocalFlags, global.DeviceIdx, global.Sequence); err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
// Clear the need and global flags on all other entries
|
|
//nolint:sqlclosecheck
|
|
upStmt, err = txp.Preparex(`
|
|
UPDATE files SET local_flags = local_flags & ?
|
|
WHERE name_idx = (SELECT idx FROM file_names WHERE name = ?) AND sequence != ? AND local_flags & ? != 0
|
|
`)
|
|
if err != nil {
|
|
return wrap(err, "prepare update")
|
|
}
|
|
if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *DB) folderIdxLocked(folderID string) (int64, error) {
|
|
if _, err := s.stmt(`
|
|
INSERT OR IGNORE INTO folders(folder_id)
|
|
VALUES (?)
|
|
`).Exec(folderID); err != nil {
|
|
return 0, wrap(err)
|
|
}
|
|
var idx int64
|
|
if err := s.stmt(`
|
|
SELECT idx FROM folders
|
|
WHERE folder_id = ?
|
|
`).Get(&idx, folderID); err != nil {
|
|
return 0, wrap(err)
|
|
}
|
|
|
|
return idx, nil
|
|
}
|
|
|
|
type fileRow struct {
|
|
Name string
|
|
Version dbVector
|
|
DeviceIdx int64 `db:"device_idx"`
|
|
Sequence int64
|
|
Modified int64
|
|
Size int64
|
|
LocalFlags protocol.FlagLocal `db:"local_flags"`
|
|
Deleted bool
|
|
}
|
|
|
|
func (e fileRow) Compare(other fileRow) int {
|
|
// From FileInfo.WinsConflict
|
|
vc := e.Version.Compare(other.Version.Vector)
|
|
switch vc {
|
|
case protocol.Equal:
|
|
if e.IsInvalid() != other.IsInvalid() {
|
|
if e.IsInvalid() {
|
|
return 1
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// Compare the device ID index, lower is better. This is only
|
|
// deterministic to the extent that LocalDeviceID will always be the
|
|
// lowest one, order between remote devices is random (and
|
|
// irrelevant).
|
|
return cmp.Compare(e.DeviceIdx, other.DeviceIdx)
|
|
case protocol.Greater: // we are newer
|
|
return -1
|
|
case protocol.Lesser: // we are older
|
|
return 1
|
|
case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict
|
|
if e.IsInvalid() != other.IsInvalid() {
|
|
if e.IsInvalid() { // we are invalid, we lose
|
|
return 1
|
|
}
|
|
return -1 // they are invalid, we win
|
|
}
|
|
if d := cmp.Compare(e.Modified, other.Modified); d != 0 {
|
|
return -d // positive d means we were newer, so we win (negative return)
|
|
}
|
|
if vc == protocol.ConcurrentGreater {
|
|
return -1 // we have a better device ID, we win
|
|
}
|
|
return 1 // they win
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func (e fileRow) IsInvalid() bool {
|
|
return e.LocalFlags.IsInvalid()
|
|
}
|
|
|
|
func (s *folderDB) periodicCheckpointLocked(fs []protocol.FileInfo) {
|
|
// Induce periodic checkpoints. We add points for each file and block,
|
|
// and checkpoint when we've written more than a threshold of points.
|
|
// This ensures we do not go too long without a checkpoint, while also
|
|
// not doing it incessantly for every update.
|
|
s.updatePoints += updatePointsPerFile * len(fs)
|
|
for _, f := range fs {
|
|
s.updatePoints += len(f.Blocks) * updatePointsPerBlock
|
|
}
|
|
if s.updatePoints > updatePointsThreshold {
|
|
conn, err := s.sql.Conn(context.Background())
|
|
if err != nil {
|
|
slog.Debug("Connection error", slog.String("db", s.baseName), slogutil.Error(err))
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 8388608`); err != nil {
|
|
slog.Debug("PRAGMA journal_size_limit error", slog.String("db", s.baseName), slogutil.Error(err))
|
|
}
|
|
|
|
// Every 50th checkpoint becomes a truncate, in an effort to bring
|
|
// down the size now and then.
|
|
checkpointType := "RESTART"
|
|
if s.checkpointsCount > 50 {
|
|
checkpointType = "TRUNCATE"
|
|
}
|
|
cmd := fmt.Sprintf(`PRAGMA wal_checkpoint(%s)`, checkpointType)
|
|
row := conn.QueryRowContext(context.Background(), cmd)
|
|
|
|
var res, modified, moved int
|
|
if row.Err() != nil {
|
|
slog.Debug("Command error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
|
|
} else if err := row.Scan(&res, &modified, &moved); err != nil {
|
|
slog.Debug("Command scan error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
|
|
} else {
|
|
slog.Debug("Checkpoint result", "db", s.baseName, "checkpointscount", s.checkpointsCount, "updatepoints", s.updatePoints, "res", res, "modified", modified, "moved", moved)
|
|
}
|
|
|
|
// Reset the truncate counter when a truncate succeeded. If it
|
|
// failed, we'll keep trying it until we succeed. Increase it faster
|
|
// when we fail to checkpoint, as it's more likely the WAL is
|
|
// growing and will need truncation when we get out of this state.
|
|
switch {
|
|
case res == 1:
|
|
s.checkpointsCount += 10
|
|
case res == 0 && checkpointType == "TRUNCATE":
|
|
s.checkpointsCount = 0
|
|
default:
|
|
s.checkpointsCount++
|
|
}
|
|
s.updatePoints = 0
|
|
}
|
|
}
|