Files
syncthing/internal/db/sqlite/folderdb_update.go
Jakob Borg 6f0acacbd2 fix(sqlite): actually always insert blocks for local files (fixes #10388) (#10411)
Due to a thinko, this optimisation was wildly incorrect and would read
to lack of block reuse when syncing files.

(We do not insert a blocklist per device, but only a single one. We
can't use the fact of whether the insert happened as a criteria for
inserting blocks.)

Signed-off-by: Jakob Borg <jakob@kastelo.net>
2025-09-23 12:46:31 +00:00

570 lines
16 KiB
Go

// Copyright (C) 2025 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package sqlite
import (
"cmp"
"context"
"fmt"
"log/slog"
"slices"
"github.com/jmoiron/sqlx"
"github.com/syncthing/syncthing/internal/gen/dbproto"
"github.com/syncthing/syncthing/internal/itererr"
"github.com/syncthing/syncthing/internal/slogutil"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sliceutil"
"google.golang.org/protobuf/proto"
)
const (
// Arbitrarily chosen values for checkpoint frequency....
updatePointsPerFile = 100
updatePointsPerBlock = 1
updatePointsThreshold = 250_000
)
func (s *folderDB) Update(device protocol.DeviceID, fs []protocol.FileInfo) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
//nolint:sqlclosecheck
insertNameStmt, err := txp.Preparex(`
INSERT INTO file_names(name)
VALUES (?)
ON CONFLICT(name) DO UPDATE
SET name = excluded.name
RETURNING idx
`)
if err != nil {
return wrap(err, "prepare insert name")
}
//nolint:sqlclosecheck
insertVersionStmt, err := txp.Preparex(`
INSERT INTO file_versions (version)
VALUES (?)
ON CONFLICT(version) DO UPDATE
SET version = excluded.version
RETURNING idx
`)
if err != nil {
return wrap(err, "prepare insert version")
}
//nolint:sqlclosecheck
insertFileStmt, err := txp.Preparex(`
INSERT OR REPLACE INTO files (device_idx, remote_sequence, type, modified, size, deleted, local_flags, blocklist_hash, name_idx, version_idx)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
RETURNING sequence
`)
if err != nil {
return wrap(err, "prepare insert file")
}
//nolint:sqlclosecheck
insertFileInfoStmt, err := txp.Preparex(`
INSERT INTO fileinfos (sequence, fiprotobuf)
VALUES (?, ?)
`)
if err != nil {
return wrap(err, "prepare insert fileinfo")
}
//nolint:sqlclosecheck
insertBlockListStmt, err := txp.Preparex(`
INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf)
VALUES (?, ?)
`)
if err != nil {
return wrap(err, "prepare insert blocklist")
}
var prevRemoteSeq int64
for i, f := range fs {
f.Name = osutil.NormalizedFilename(f.Name)
var blockshash *[]byte
if len(f.Blocks) > 0 {
f.BlocksHash = protocol.BlocksHash(f.Blocks)
blockshash = &f.BlocksHash
} else {
f.BlocksHash = nil
}
if f.Type == protocol.FileInfoTypeDirectory {
f.Size = 128 // synthetic directory size
}
// Insert the file.
//
// If it is a remote file, set remote_sequence otherwise leave it at
// null. Returns the new local sequence.
var remoteSeq *int64
if device != protocol.LocalDeviceID {
if i > 0 && f.Sequence == prevRemoteSeq {
return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq)
}
prevRemoteSeq = f.Sequence
remoteSeq = &f.Sequence
}
var nameIdx int64
if err := insertNameStmt.Get(&nameIdx, f.Name); err != nil {
return wrap(err, "insert name")
}
var versionIdx int64
if err := insertVersionStmt.Get(&versionIdx, f.Version.String()); err != nil {
return wrap(err, "insert version")
}
var localSeq int64
if err := insertFileStmt.Get(&localSeq, deviceIdx, remoteSeq, f.Type, f.ModTime().UnixNano(), f.Size, f.IsDeleted(), f.LocalFlags, blockshash, nameIdx, versionIdx); err != nil {
return wrap(err, "insert file")
}
if len(f.Blocks) > 0 {
// Indirect the block list
blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire)
bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks})
if err != nil {
return wrap(err, "marshal blocklist")
}
if _, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil {
return wrap(err, "insert blocklist")
} else if device == protocol.LocalDeviceID {
// Insert all blocks
if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil {
return wrap(err, "insert blocks")
}
}
f.Blocks = nil
}
// Insert the fileinfo
if device == protocol.LocalDeviceID {
f.Sequence = localSeq
}
bs, err := proto.Marshal(f.ToWire(true))
if err != nil {
return wrap(err, "marshal fileinfo")
}
if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil {
return wrap(err, "insert fileinfo")
}
// Update global and need
if err := s.recalcGlobalForFile(txp, f.Name); err != nil {
return wrap(err)
}
}
if err := tx.Commit(); err != nil {
return wrap(err)
}
s.periodicCheckpointLocked(fs)
return nil
}
func (s *folderDB) DropDevice(device protocol.DeviceID) error {
if device == protocol.LocalDeviceID {
panic("bug: cannot drop local device")
}
s.updateLock.Lock()
defer s.updateLock.Unlock()
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
// Drop the device, which cascades to delete all files etc for it
if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil {
return wrap(err)
}
// Recalc the globals for all affected folders
if err := s.recalcGlobalForFolder(txp); err != nil {
return wrap(err)
}
return wrap(tx.Commit())
}
func (s *folderDB) DropAllFiles(device protocol.DeviceID) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
// This is a two part operation, first dropping all the files and then
// recalculating the global state for the entire folder.
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
// Drop all the file entries
result, err := tx.Exec(`
DELETE FROM files
WHERE device_idx = ?
`, deviceIdx)
if err != nil {
return wrap(err)
}
if n, err := result.RowsAffected(); err == nil && n == 0 {
// The delete affected no rows, so we don't need to redo the entire
// global/need calculation.
return wrap(tx.Commit())
}
// Recalc global for the entire folder
if err := s.recalcGlobalForFolder(txp); err != nil {
return wrap(err)
}
return wrap(tx.Commit())
}
func (s *folderDB) DropFilesNamed(device protocol.DeviceID, names []string) error {
for i := range names {
names[i] = osutil.NormalizedFilename(names[i])
}
s.updateLock.Lock()
defer s.updateLock.Unlock()
deviceIdx, err := s.deviceIdxLocked(device)
if err != nil {
return wrap(err)
}
tx, err := s.sql.BeginTxx(context.Background(), nil)
if err != nil {
return wrap(err)
}
defer tx.Rollback() //nolint:errcheck
txp := &txPreparedStmts{Tx: tx}
// Drop the named files
query, args, err := sqlx.In(`
DELETE FROM files
WHERE device_idx = ? AND name_idx IN (
SELECT idx FROM file_names WHERE name IN (?)
)
`, deviceIdx, names)
if err != nil {
return wrap(err)
}
if _, err := tx.Exec(query, args...); err != nil {
return wrap(err)
}
// Recalc globals for the named files
for _, name := range names {
if err := s.recalcGlobalForFile(txp, name); err != nil {
return wrap(err)
}
}
return wrap(tx.Commit())
}
func (*folderDB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error {
if len(blocks) == 0 {
return nil
}
bs := make([]map[string]any, len(blocks))
for i, b := range blocks {
bs[i] = map[string]any{
"hash": b.Hash,
"blocklist_hash": blocklistHash,
"idx": i,
"offset": b.Offset,
"size": b.Size,
}
}
// Very large block lists (>8000 blocks) result in "too many variables"
// error. Chunk it to a reasonable size.
for chunk := range slices.Chunk(bs, 1000) {
if _, err := tx.NamedExec(`
INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size)
VALUES (:hash, :blocklist_hash, :idx, :offset, :size)
`, chunk); err != nil {
return wrap(err)
}
}
return nil
}
func (s *folderDB) recalcGlobalForFolder(txp *txPreparedStmts) error {
// Select files where there is no global, those are the ones we need to
// recalculate.
//nolint:sqlclosecheck
namesStmt, err := txp.Preparex(`
SELECT n.name FROM files f
INNER JOIN file_names n ON n.idx = f.name_idx
WHERE NOT EXISTS (
SELECT 1 FROM files g
WHERE g.name_idx = f.name_idx AND g.local_flags & ? != 0
)
GROUP BY n.name
`)
if err != nil {
return wrap(err)
}
rows, err := namesStmt.Queryx(protocol.FlagLocalGlobal)
if err != nil {
return wrap(err)
}
defer rows.Close()
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return wrap(err)
}
if err := s.recalcGlobalForFile(txp, name); err != nil {
return wrap(err)
}
}
return wrap(rows.Err())
}
func (s *folderDB) recalcGlobalForFile(txp *txPreparedStmts, file string) error {
//nolint:sqlclosecheck
selStmt, err := txp.Preparex(`
SELECT n.name, f.device_idx, f.sequence, f.modified, v.version, f.deleted, f.local_flags FROM files f
INNER JOIN file_versions v ON v.idx = f.version_idx
INNER JOIN file_names n ON n.idx = f.name_idx
WHERE n.name = ?
`)
if err != nil {
return wrap(err, "prepare select")
}
es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(file)))
if err != nil {
return wrap(err)
}
if len(es) == 0 {
// shouldn't happen
return nil
}
// Sort the entries; the global entry is at the head of the list
slices.SortFunc(es, fileRow.Compare)
// The global version is the first one in the list that is not invalid,
// or just the first one in the list if all are invalid.
var global fileRow
globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.IsInvalid() })
if globIdx < 0 {
globIdx = 0
}
global = es[globIdx]
// We "have" the file if the position in the list of versions is at the
// global version or better, or if the version is the same as the global
// file (we might be further down the list due to invalid flags), or if
// the global is deleted and we don't have it at all...
localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx })
hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version
localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored
localIdx < 0 && global.Deleted // missing it, but the global is also deleted
// Set the global flag on the global entry. Set the need flag if the
// local device needs this file, unless it's invalid.
global.LocalFlags |= protocol.FlagLocalGlobal
if hasLocal || global.IsInvalid() {
global.LocalFlags &= ^protocol.FlagLocalNeeded
} else {
global.LocalFlags |= protocol.FlagLocalNeeded
}
//nolint:sqlclosecheck
upStmt, err := txp.Preparex(`
UPDATE files SET local_flags = ?
WHERE device_idx = ? AND sequence = ?
`)
if err != nil {
return wrap(err)
}
if _, err := upStmt.Exec(global.LocalFlags, global.DeviceIdx, global.Sequence); err != nil {
return wrap(err)
}
// Clear the need and global flags on all other entries
//nolint:sqlclosecheck
upStmt, err = txp.Preparex(`
UPDATE files SET local_flags = local_flags & ?
WHERE name_idx = (SELECT idx FROM file_names WHERE name = ?) AND sequence != ? AND local_flags & ? != 0
`)
if err != nil {
return wrap(err, "prepare update")
}
if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil {
return wrap(err)
}
return nil
}
func (s *DB) folderIdxLocked(folderID string) (int64, error) {
if _, err := s.stmt(`
INSERT OR IGNORE INTO folders(folder_id)
VALUES (?)
`).Exec(folderID); err != nil {
return 0, wrap(err)
}
var idx int64
if err := s.stmt(`
SELECT idx FROM folders
WHERE folder_id = ?
`).Get(&idx, folderID); err != nil {
return 0, wrap(err)
}
return idx, nil
}
type fileRow struct {
Name string
Version dbVector
DeviceIdx int64 `db:"device_idx"`
Sequence int64
Modified int64
Size int64
LocalFlags protocol.FlagLocal `db:"local_flags"`
Deleted bool
}
func (e fileRow) Compare(other fileRow) int {
// From FileInfo.WinsConflict
vc := e.Version.Compare(other.Version.Vector)
switch vc {
case protocol.Equal:
if e.IsInvalid() != other.IsInvalid() {
if e.IsInvalid() {
return 1
}
return -1
}
// Compare the device ID index, lower is better. This is only
// deterministic to the extent that LocalDeviceID will always be the
// lowest one, order between remote devices is random (and
// irrelevant).
return cmp.Compare(e.DeviceIdx, other.DeviceIdx)
case protocol.Greater: // we are newer
return -1
case protocol.Lesser: // we are older
return 1
case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict
if e.IsInvalid() != other.IsInvalid() {
if e.IsInvalid() { // we are invalid, we lose
return 1
}
return -1 // they are invalid, we win
}
if d := cmp.Compare(e.Modified, other.Modified); d != 0 {
return -d // positive d means we were newer, so we win (negative return)
}
if vc == protocol.ConcurrentGreater {
return -1 // we have a better device ID, we win
}
return 1 // they win
default:
return 0
}
}
func (e fileRow) IsInvalid() bool {
return e.LocalFlags.IsInvalid()
}
func (s *folderDB) periodicCheckpointLocked(fs []protocol.FileInfo) {
// Induce periodic checkpoints. We add points for each file and block,
// and checkpoint when we've written more than a threshold of points.
// This ensures we do not go too long without a checkpoint, while also
// not doing it incessantly for every update.
s.updatePoints += updatePointsPerFile * len(fs)
for _, f := range fs {
s.updatePoints += len(f.Blocks) * updatePointsPerBlock
}
if s.updatePoints > updatePointsThreshold {
conn, err := s.sql.Conn(context.Background())
if err != nil {
slog.Debug("Connection error", slog.String("db", s.baseName), slogutil.Error(err))
return
}
defer conn.Close()
if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 8388608`); err != nil {
slog.Debug("PRAGMA journal_size_limit error", slog.String("db", s.baseName), slogutil.Error(err))
}
// Every 50th checkpoint becomes a truncate, in an effort to bring
// down the size now and then.
checkpointType := "RESTART"
if s.checkpointsCount > 50 {
checkpointType = "TRUNCATE"
}
cmd := fmt.Sprintf(`PRAGMA wal_checkpoint(%s)`, checkpointType)
row := conn.QueryRowContext(context.Background(), cmd)
var res, modified, moved int
if row.Err() != nil {
slog.Debug("Command error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
} else if err := row.Scan(&res, &modified, &moved); err != nil {
slog.Debug("Command scan error", slog.String("db", s.baseName), slog.String("cmd", cmd), slogutil.Error(err))
} else {
slog.Debug("Checkpoint result", "db", s.baseName, "checkpointscount", s.checkpointsCount, "updatepoints", s.updatePoints, "res", res, "modified", modified, "moved", moved)
}
// Reset the truncate counter when a truncate succeeded. If it
// failed, we'll keep trying it until we succeed. Increase it faster
// when we fail to checkpoint, as it's more likely the WAL is
// growing and will need truncation when we get out of this state.
switch {
case res == 1:
s.checkpointsCount += 10
case res == 0 && checkpointType == "TRUNCATE":
s.checkpointsCount = 0
default:
s.checkpointsCount++
}
s.updatePoints = 0
}
}