mirror of
https://github.com/syncthing/syncthing.git
synced 2026-03-26 10:11:30 -04:00
Also adds a method to query the last database maintenance time. Signed-off-by: Tommy van der Vorst <tommy@pixelspark.nl> Co-authored-by: Jakob Borg <jakob@kastelo.net>
372 lines
10 KiB
Go
372 lines
10 KiB
Go
// Copyright (C) 2025 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
package sqlite
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"log/slog"
|
|
"math/rand"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/syncthing/syncthing/internal/db"
|
|
"github.com/syncthing/syncthing/internal/slogutil"
|
|
"github.com/syncthing/syncthing/lib/protocol"
|
|
)
|
|
|
|
const (
|
|
internalMetaPrefix = "dbsvc"
|
|
lastMaintKey = "lastMaint"
|
|
lastSuccessfulGCSeqKey = "lastSuccessfulGCSeq"
|
|
|
|
gcMinChunks = 5
|
|
gcChunkSize = 100_000 // approximate number of rows to process in a single gc query
|
|
gcMaxRuntime = 5 * time.Minute // max time to spend on gc, per table, per run
|
|
)
|
|
|
|
func (s *DB) Service(maintenanceInterval time.Duration) db.DBService {
|
|
return newService(s, maintenanceInterval)
|
|
}
|
|
|
|
type Service struct {
|
|
sdb *DB
|
|
maintenanceInterval time.Duration
|
|
internalMeta *db.Typed
|
|
start chan chan error
|
|
}
|
|
|
|
func (s *Service) String() string {
|
|
return fmt.Sprintf("sqlite.service@%p", s)
|
|
}
|
|
|
|
func newService(sdb *DB, maintenanceInterval time.Duration) *Service {
|
|
return &Service{
|
|
sdb: sdb,
|
|
maintenanceInterval: maintenanceInterval,
|
|
internalMeta: db.NewTyped(sdb, internalMetaPrefix),
|
|
start: make(chan chan error),
|
|
}
|
|
}
|
|
|
|
func (s *Service) StartMaintenance() <-chan error {
|
|
finishChan := make(chan error, 1)
|
|
select {
|
|
case s.start <- finishChan:
|
|
default:
|
|
}
|
|
return finishChan
|
|
}
|
|
|
|
func (s *Service) Serve(ctx context.Context) error {
|
|
// Run periodic maintenance
|
|
// Figure out when we last ran maintenance and schedule accordingly. If
|
|
// it was never, do it now.
|
|
lastMaint, _, _ := s.internalMeta.Time(lastMaintKey)
|
|
nextMaint := lastMaint.Add(s.maintenanceInterval)
|
|
wait := time.Until(nextMaint)
|
|
if wait < 0 {
|
|
wait = time.Minute
|
|
}
|
|
slog.DebugContext(ctx, "Next periodic run due", "after", wait)
|
|
timer := time.NewTimer(wait)
|
|
|
|
if s.maintenanceInterval == 0 {
|
|
timer.Stop()
|
|
}
|
|
|
|
for {
|
|
var finishChan chan error
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-timer.C:
|
|
case finishChan = <-s.start:
|
|
}
|
|
|
|
err := s.periodic(ctx)
|
|
if finishChan != nil {
|
|
finishChan <- err
|
|
}
|
|
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
if s.maintenanceInterval != 0 {
|
|
timer.Reset(s.maintenanceInterval)
|
|
slog.DebugContext(ctx, "Next periodic run due", "after", s.maintenanceInterval)
|
|
}
|
|
|
|
_ = s.internalMeta.PutTime(lastMaintKey, time.Now())
|
|
}
|
|
}
|
|
|
|
func (s *Service) LastMaintenanceTime() time.Time {
|
|
lastMaint, _, _ := s.internalMeta.Time(lastMaintKey)
|
|
return lastMaint
|
|
}
|
|
|
|
func (s *Service) periodic(ctx context.Context) error {
|
|
t0 := time.Now()
|
|
slog.DebugContext(ctx, "Periodic start")
|
|
|
|
t1 := time.Now()
|
|
defer func() { slog.DebugContext(ctx, "Periodic done in", "t1", time.Since(t1), "t0t1", t1.Sub(t0)) }()
|
|
|
|
s.sdb.updateLock.Lock()
|
|
err := tidy(ctx, s.sdb.sql)
|
|
s.sdb.updateLock.Unlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return wrap(s.sdb.forEachFolder(func(fdb *folderDB) error {
|
|
// Get the current device sequence, for comparison in the next step.
|
|
seq, err := fdb.GetDeviceSequence(protocol.LocalDeviceID)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
// Get the last successful GC sequence. If it's the same as the
|
|
// current sequence, nothing has changed and we can skip the GC
|
|
// entirely.
|
|
meta := db.NewTyped(fdb, internalMetaPrefix)
|
|
if prev, _, err := meta.Int64(lastSuccessfulGCSeqKey); err != nil {
|
|
return wrap(err)
|
|
} else if seq == prev {
|
|
slog.DebugContext(ctx, "Skipping unnecessary GC", "folder", fdb.folderID, "fdb", fdb.baseName)
|
|
return nil
|
|
}
|
|
|
|
// Run the GC steps, in a function to be able to use a deferred
|
|
// unlock.
|
|
if err := func() error {
|
|
fdb.updateLock.Lock()
|
|
defer fdb.updateLock.Unlock()
|
|
|
|
if err := garbageCollectOldDeletedLocked(ctx, fdb); err != nil {
|
|
return wrap(err)
|
|
}
|
|
if err := garbageCollectNamesAndVersions(ctx, fdb); err != nil {
|
|
return wrap(err)
|
|
}
|
|
if err := garbageCollectBlocklistsAndBlocksLocked(ctx, fdb); err != nil {
|
|
return wrap(err)
|
|
}
|
|
return tidy(ctx, fdb.sql)
|
|
}(); err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
// Update the successful GC sequence.
|
|
return wrap(meta.PutInt64(lastSuccessfulGCSeqKey, seq))
|
|
}))
|
|
}
|
|
|
|
func tidy(ctx context.Context, db *sqlx.DB) error {
|
|
conn, err := db.Conn(ctx)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
defer conn.Close()
|
|
_, _ = conn.ExecContext(ctx, `ANALYZE`)
|
|
_, _ = conn.ExecContext(ctx, `PRAGMA optimize`)
|
|
_, _ = conn.ExecContext(ctx, `PRAGMA incremental_vacuum`)
|
|
_, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 8388608`)
|
|
_, _ = conn.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`)
|
|
return nil
|
|
}
|
|
|
|
func garbageCollectNamesAndVersions(ctx context.Context, fdb *folderDB) error {
|
|
l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName)
|
|
|
|
res, err := fdb.stmt(`
|
|
DELETE FROM file_names
|
|
WHERE NOT EXISTS (SELECT 1 FROM files f WHERE f.name_idx = idx)
|
|
`).Exec()
|
|
if err != nil {
|
|
return wrap(err, "delete names")
|
|
}
|
|
if aff, err := res.RowsAffected(); err == nil {
|
|
l.DebugContext(ctx, "Removed old file names", "affected", aff)
|
|
}
|
|
|
|
res, err = fdb.stmt(`
|
|
DELETE FROM file_versions
|
|
WHERE NOT EXISTS (SELECT 1 FROM files f WHERE f.version_idx = idx)
|
|
`).Exec()
|
|
if err != nil {
|
|
return wrap(err, "delete versions")
|
|
}
|
|
if aff, err := res.RowsAffected(); err == nil {
|
|
l.DebugContext(ctx, "Removed old file versions", "affected", aff)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func garbageCollectOldDeletedLocked(ctx context.Context, fdb *folderDB) error {
|
|
l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName)
|
|
if fdb.deleteRetention <= 0 {
|
|
slog.DebugContext(ctx, "Delete retention is infinite, skipping cleanup")
|
|
return nil
|
|
}
|
|
|
|
// Remove deleted files that are marked as not needed (we have processed
|
|
// them) and they were deleted more than MaxDeletedFileAge ago.
|
|
l.DebugContext(ctx, "Forgetting deleted files", "retention", fdb.deleteRetention)
|
|
res, err := fdb.stmt(`
|
|
DELETE FROM files
|
|
WHERE deleted AND modified < ? AND local_flags & {{.FlagLocalNeeded}} == 0
|
|
`).Exec(time.Now().Add(-fdb.deleteRetention).UnixNano())
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
if aff, err := res.RowsAffected(); err == nil {
|
|
l.DebugContext(ctx, "Removed old deleted file records", "affected", aff)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func garbageCollectBlocklistsAndBlocksLocked(ctx context.Context, fdb *folderDB) error {
|
|
// Remove all blocklists not referred to by any files and, by extension,
|
|
// any blocks not referred to by a blocklist. This is an expensive
|
|
// operation when run normally, especially if there are a lot of blocks
|
|
// to collect.
|
|
//
|
|
// We make this orders of magnitude faster by disabling foreign keys for
|
|
// the transaction and doing the cleanup manually. This requires using
|
|
// an explicit connection and disabling foreign keys before starting the
|
|
// transaction. We make sure to clean up on the way out.
|
|
|
|
conn, err := fdb.sql.Connx(ctx)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
if _, err := conn.ExecContext(ctx, `PRAGMA foreign_keys = 0`); err != nil {
|
|
return wrap(err)
|
|
}
|
|
defer func() { //nolint:contextcheck
|
|
_, _ = conn.ExecContext(context.Background(), `PRAGMA foreign_keys = 1`)
|
|
}()
|
|
|
|
tx, err := conn.BeginTxx(ctx, nil)
|
|
if err != nil {
|
|
return wrap(err)
|
|
}
|
|
defer tx.Rollback() //nolint:errcheck
|
|
|
|
// Both blocklists and blocks refer to blocklists_hash from the files table.
|
|
for _, table := range []string{"blocklists", "blocks"} {
|
|
// Count the number of rows
|
|
var rows int64
|
|
if err := tx.GetContext(ctx, &rows, `SELECT count(*) FROM `+table); err != nil {
|
|
return wrap(err)
|
|
}
|
|
|
|
chunks := max(gcMinChunks, rows/gcChunkSize)
|
|
l := slog.With("folder", fdb.folderID, "fdb", fdb.baseName, "table", table, "rows", rows, "chunks", chunks)
|
|
|
|
// Process rows in chunks up to a given time limit. We always use at
|
|
// least gcMinChunks chunks, then increase the number as the number of rows
|
|
// exceeds gcMinChunks*gcChunkSize.
|
|
t0 := time.Now()
|
|
for i, br := range randomBlobRanges(int(chunks)) {
|
|
if d := time.Since(t0); d > gcMaxRuntime {
|
|
l.InfoContext(ctx, "GC was interrupted due to exceeding time limit", "processed", i, "runtime", time.Since(t0))
|
|
break
|
|
}
|
|
|
|
// The limit column must be an indexed column with a mostly random distribution of blobs.
|
|
// That's the blocklist_hash column for blocklists, and the hash column for blocks.
|
|
limitColumn := table + ".blocklist_hash"
|
|
if table == "blocks" {
|
|
limitColumn = "blocks.hash"
|
|
}
|
|
|
|
q := fmt.Sprintf(`
|
|
DELETE FROM %s
|
|
WHERE %s AND NOT EXISTS (
|
|
SELECT 1 FROM files WHERE files.blocklist_hash = %s.blocklist_hash
|
|
)`, table, br.SQL(limitColumn), table)
|
|
|
|
if res, err := tx.ExecContext(ctx, q); err != nil {
|
|
return wrap(err, "delete from "+table)
|
|
} else {
|
|
l.DebugContext(ctx, "GC query result", "processed", i, "runtime", time.Since(t0), "result", slogutil.Expensive(func() any {
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return slogutil.Error(err)
|
|
}
|
|
return slog.Int64("rows", rows)
|
|
}))
|
|
}
|
|
}
|
|
}
|
|
|
|
return wrap(tx.Commit())
|
|
}
|
|
|
|
// blobRange defines a range for blob searching. A range is open ended if
|
|
// start or end is nil.
|
|
type blobRange struct {
|
|
start, end []byte
|
|
}
|
|
|
|
// SQL returns the SQL where clause for the given range, e.g.
|
|
// `column >= x'49249248' AND column < x'6db6db6c'`
|
|
func (r blobRange) SQL(name string) string {
|
|
var sb strings.Builder
|
|
if r.start != nil {
|
|
fmt.Fprintf(&sb, "%s >= x'%x'", name, r.start)
|
|
}
|
|
if r.start != nil && r.end != nil {
|
|
sb.WriteString(" AND ")
|
|
}
|
|
if r.end != nil {
|
|
fmt.Fprintf(&sb, "%s < x'%x'", name, r.end)
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
// randomBlobRanges returns n blobRanges in random order
|
|
func randomBlobRanges(n int) []blobRange {
|
|
ranges := blobRanges(n)
|
|
rand.Shuffle(len(ranges), func(i, j int) { ranges[i], ranges[j] = ranges[j], ranges[i] })
|
|
return ranges
|
|
}
|
|
|
|
// blobRanges returns n blobRanges
|
|
func blobRanges(n int) []blobRange {
|
|
// We use three byte (24 bit) prefixes to get fairly granular ranges and easy bit
|
|
// conversions.
|
|
rangeSize := (1 << 24) / n
|
|
ranges := make([]blobRange, 0, n)
|
|
var prev []byte
|
|
for i := range n {
|
|
var pref []byte
|
|
if i < n-1 {
|
|
end := (i + 1) * rangeSize
|
|
pref = intToBlob(end)
|
|
}
|
|
ranges = append(ranges, blobRange{prev, pref})
|
|
prev = pref
|
|
}
|
|
return ranges
|
|
}
|
|
|
|
func intToBlob(n int) []byte {
|
|
var pref [4]byte
|
|
binary.BigEndian.PutUint32(pref[:], uint32(n)) //nolint:gosec
|
|
// first byte is always zero and not part of the range
|
|
return pref[1:]
|
|
}
|