From cf1cf85ce60c87c01abd7162eb9cb28f31a8446a Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sun, 6 Apr 2025 05:30:43 -0700 Subject: [PATCH] chore(db): use one SQLite database per folder (#10042) This changes the database structure to use one database per folder, with a small main database to coordinate. Reverts the prior change to buffer all files in memory when pulling, meaning there is now a phase where the WAL file will grow significantly, at least for initial sync of folders with many directories. --------- Co-authored-by: bt90 --- internal/db/interface.go | 4 +- internal/db/metrics.go | 6 +- internal/db/sqlite/basedb.go | 251 ++++++++ internal/db/sqlite/db.go | 88 --- internal/db/sqlite/db_folderdb.go | 391 ++++++++++++ internal/db/sqlite/db_kv.go | 8 +- internal/db/sqlite/db_local_test.go | 41 +- internal/db/sqlite/db_mtimes.go | 54 -- internal/db/sqlite/db_open.go | 247 +++----- internal/db/sqlite/db_schema.go | 88 --- internal/db/sqlite/db_service.go | 56 +- internal/db/sqlite/db_test.go | 16 +- internal/db/sqlite/db_update.go | 564 +----------------- .../{db_counts.go => folderdb_counts.go} | 50 +- .../{db_global.go => folderdb_global.go} | 61 +- .../{db_indexid.go => folderdb_indexid.go} | 50 +- .../sqlite/{db_local.go => folderdb_local.go} | 72 +-- internal/db/sqlite/folderdb_mtimes.go | 45 ++ internal/db/sqlite/folderdb_open.go | 110 ++++ internal/db/sqlite/folderdb_update.go | 531 +++++++++++++++++ .../sql/schema/{ => common}/10-schema.sql | 0 .../sqlite/sql/schema/{ => common}/70-kv.sql | 0 .../sql/schema/{ => folder}/00-indexes.sql | 9 +- .../sql/schema/{ => folder}/20-files.sql | 12 +- .../sql/schema/{ => folder}/30-indexids.sql | 8 +- .../sql/schema/{ => folder}/40-counts.sql | 22 +- .../sql/schema/{ => folder}/50-blocks.sql | 0 .../sql/schema/{ => folder}/50-mtimes.sql | 4 +- .../db/sqlite/sql/schema/main/00-indexes.sql | 12 + lib/locations/locations.go | 2 +- lib/model/folder_sendrecv.go | 104 ++-- lib/model/folder_sendrecv_test.go | 9 +- 32 files changed, 1696 insertions(+), 1219 deletions(-) create mode 100644 internal/db/sqlite/basedb.go delete mode 100644 internal/db/sqlite/db.go create mode 100644 internal/db/sqlite/db_folderdb.go delete mode 100644 internal/db/sqlite/db_mtimes.go delete mode 100644 internal/db/sqlite/db_schema.go rename internal/db/sqlite/{db_counts.go => folderdb_counts.go} (60%) rename internal/db/sqlite/{db_global.go => folderdb_global.go} (63%) rename internal/db/sqlite/{db_indexid.go => folderdb_indexid.go} (64%) rename internal/db/sqlite/{db_local.go => folderdb_local.go} (59%) create mode 100644 internal/db/sqlite/folderdb_mtimes.go create mode 100644 internal/db/sqlite/folderdb_open.go create mode 100644 internal/db/sqlite/folderdb_update.go rename internal/db/sqlite/sql/schema/{ => common}/10-schema.sql (100%) rename internal/db/sqlite/sql/schema/{ => common}/70-kv.sql (100%) rename internal/db/sqlite/sql/schema/{ => folder}/00-indexes.sql (63%) rename internal/db/sqlite/sql/schema/{ => folder}/20-files.sql (88%) rename internal/db/sqlite/sql/schema/{ => folder}/30-indexids.sql (70%) rename internal/db/sqlite/sql/schema/{ => folder}/40-counts.sql (57%) rename internal/db/sqlite/sql/schema/{ => folder}/50-blocks.sql (100%) rename internal/db/sqlite/sql/schema/{ => folder}/50-mtimes.sql (76%) create mode 100644 internal/db/sqlite/sql/schema/main/00-indexes.sql diff --git a/internal/db/interface.go b/internal/db/interface.go index 550143d73..15252d33c 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -36,13 +36,13 @@ type DB interface { // required. AllGlobalFiles(folder string) (iter.Seq[FileMetadata], func() error) AllGlobalFilesPrefix(folder string, prefix string) (iter.Seq[FileMetadata], func() error) - AllLocalBlocksWithHash(hash []byte) (iter.Seq[BlockMapEntry], func() error) AllLocalFiles(folder string, device protocol.DeviceID) (iter.Seq[protocol.FileInfo], func() error) AllLocalFilesBySequence(folder string, device protocol.DeviceID, startSeq int64, limit int) (iter.Seq[protocol.FileInfo], func() error) AllLocalFilesWithPrefix(folder string, device protocol.DeviceID, prefix string) (iter.Seq[protocol.FileInfo], func() error) AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Seq[FileMetadata], func() error) - AllLocalFilesWithBlocksHashAnyFolder(h []byte) (iter.Seq2[string, FileMetadata], func() error) AllNeededGlobalFiles(folder string, device protocol.DeviceID, order config.PullOrder, limit, offset int) (iter.Seq[protocol.FileInfo], func() error) + AllLocalBlocksWithHash(hash []byte) ([]BlockMapEntry, error) + AllLocalFilesWithBlocksHashAnyFolder(hash []byte) (map[string][]FileMetadata, error) // Cleanup DropAllFiles(folder string, device protocol.DeviceID) error diff --git a/internal/db/metrics.go b/internal/db/metrics.go index d59408dc4..f906bceed 100644 --- a/internal/db/metrics.go +++ b/internal/db/metrics.go @@ -67,9 +67,9 @@ func (m metricsDB) AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Se return m.DB.AllLocalFilesWithBlocksHash(folder, h) } -func (m metricsDB) AllLocalFilesWithBlocksHashAnyFolder(h []byte) (iter.Seq2[string, FileMetadata], func() error) { +func (m metricsDB) AllLocalFilesWithBlocksHashAnyFolder(hash []byte) (map[string][]FileMetadata, error) { defer m.account("-", "AllLocalFilesWithBlocksHashAnyFolder")() - return m.DB.AllLocalFilesWithBlocksHashAnyFolder(h) + return m.DB.AllLocalFilesWithBlocksHashAnyFolder(hash) } func (m metricsDB) AllGlobalFiles(folder string) (iter.Seq[FileMetadata], func() error) { @@ -107,7 +107,7 @@ func (m metricsDB) GetGlobalAvailability(folder, file string) ([]protocol.Device return m.DB.GetGlobalAvailability(folder, file) } -func (m metricsDB) AllLocalBlocksWithHash(hash []byte) (iter.Seq[BlockMapEntry], func() error) { +func (m metricsDB) AllLocalBlocksWithHash(hash []byte) ([]BlockMapEntry, error) { defer m.account("-", "AllLocalBlocksWithHash")() return m.DB.AllLocalBlocksWithHash(hash) } diff --git a/internal/db/sqlite/basedb.go b/internal/db/sqlite/basedb.go new file mode 100644 index 000000000..63f09c160 --- /dev/null +++ b/internal/db/sqlite/basedb.go @@ -0,0 +1,251 @@ +// Copyright (C) 2025 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package sqlite + +import ( + "database/sql" + "embed" + "io/fs" + "path/filepath" + "strconv" + "strings" + "sync" + "text/template" + "time" + + "github.com/jmoiron/sqlx" + "github.com/syncthing/syncthing/lib/build" + "github.com/syncthing/syncthing/lib/protocol" +) + +const currentSchemaVersion = 1 + +//go:embed sql/** +var embedded embed.FS + +type baseDB struct { + path string + baseName string + sql *sqlx.DB + + updateLock sync.Mutex + updatePoints int + checkpointsCount int + + statementsMut sync.RWMutex + statements map[string]*sqlx.Stmt + tplInput map[string]any +} + +func openBase(path string, maxConns int, pragmas, schemaScripts, migrationScripts []string) (*baseDB, error) { + // Open the database with options to enable foreign keys and recursive + // triggers (needed for the delete+insert triggers on row replace). + sqlDB, err := sqlx.Open(dbDriver, "file:"+path+"?"+commonOptions) + if err != nil { + return nil, wrap(err) + } + + sqlDB.SetMaxOpenConns(maxConns) + + for _, pragma := range pragmas { + if _, err := sqlDB.Exec("PRAGMA " + pragma); err != nil { + return nil, wrap(err, "PRAGMA "+pragma) + } + } + + db := &baseDB{ + path: path, + baseName: filepath.Base(path), + sql: sqlDB, + statements: make(map[string]*sqlx.Stmt), + } + + for _, script := range schemaScripts { + if err := db.runScripts(script); err != nil { + return nil, wrap(err) + } + } + + ver, _ := db.getAppliedSchemaVersion() + if ver.SchemaVersion > 0 { + filter := func(scr string) bool { + scr = filepath.Base(scr) + nstr, _, ok := strings.Cut(scr, "-") + if !ok { + return false + } + n, err := strconv.ParseInt(nstr, 10, 32) + if err != nil { + return false + } + return int(n) > ver.SchemaVersion + } + for _, script := range migrationScripts { + if err := db.runScripts(script, filter); err != nil { + return nil, wrap(err) + } + } + } + + // Set the current schema version, if not already set + if err := db.setAppliedSchemaVersion(currentSchemaVersion); err != nil { + return nil, wrap(err) + } + + db.tplInput = map[string]any{ + "FlagLocalUnsupported": protocol.FlagLocalUnsupported, + "FlagLocalIgnored": protocol.FlagLocalIgnored, + "FlagLocalMustRescan": protocol.FlagLocalMustRescan, + "FlagLocalReceiveOnly": protocol.FlagLocalReceiveOnly, + "FlagLocalGlobal": protocol.FlagLocalGlobal, + "FlagLocalNeeded": protocol.FlagLocalNeeded, + "SyncthingVersion": build.LongVersion, + } + + return db, nil +} + +func (s *baseDB) Close() error { + s.updateLock.Lock() + s.statementsMut.Lock() + defer s.updateLock.Unlock() + defer s.statementsMut.Unlock() + for _, stmt := range s.statements { + stmt.Close() + } + return wrap(s.sql.Close()) +} + +var tplFuncs = template.FuncMap{ + "or": func(vs ...int) int { + v := vs[0] + for _, ov := range vs[1:] { + v |= ov + } + return v + }, +} + +// stmt returns a prepared statement for the given SQL string, after +// applying local template expansions. The statement is cached. +func (s *baseDB) stmt(tpl string) stmt { + tpl = strings.TrimSpace(tpl) + + // Fast concurrent lookup of cached statement + s.statementsMut.RLock() + stmt, ok := s.statements[tpl] + s.statementsMut.RUnlock() + if ok { + return stmt + } + + // On miss, take the full lock, check again + s.statementsMut.Lock() + defer s.statementsMut.Unlock() + stmt, ok = s.statements[tpl] + if ok { + return stmt + } + + // Apply template expansions + var sb strings.Builder + compTpl := template.Must(template.New("tpl").Funcs(tplFuncs).Parse(tpl)) + if err := compTpl.Execute(&sb, s.tplInput); err != nil { + panic("bug: bad template: " + err.Error()) + } + + // Prepare and cache + stmt, err := s.sql.Preparex(sb.String()) + if err != nil { + return failedStmt{err} + } + s.statements[tpl] = stmt + return stmt +} + +type stmt interface { + Exec(args ...any) (sql.Result, error) + Get(dest any, args ...any) error + Queryx(args ...any) (*sqlx.Rows, error) + Select(dest any, args ...any) error +} + +type failedStmt struct { + err error +} + +func (f failedStmt) Exec(_ ...any) (sql.Result, error) { return nil, f.err } +func (f failedStmt) Get(_ any, _ ...any) error { return f.err } +func (f failedStmt) Queryx(_ ...any) (*sqlx.Rows, error) { return nil, f.err } +func (f failedStmt) Select(_ any, _ ...any) error { return f.err } + +func (s *baseDB) runScripts(glob string, filter ...func(s string) bool) error { + scripts, err := fs.Glob(embedded, glob) + if err != nil { + return wrap(err) + } + + tx, err := s.sql.Begin() + if err != nil { + return wrap(err) + } + defer tx.Rollback() //nolint:errcheck + +nextScript: + for _, scr := range scripts { + for _, fn := range filter { + if !fn(scr) { + l.Debugln(s.baseName, "skipping script", scr) + continue nextScript + } + } + l.Debugln(s.baseName, "executing script", scr) + bs, err := fs.ReadFile(embedded, scr) + if err != nil { + return wrap(err, scr) + } + // SQLite requires one statement per exec, so we split the init + // files on lines containing only a semicolon and execute them + // separately. We require it on a separate line because there are + // also statement-internal semicolons in the triggers. + for _, stmt := range strings.Split(string(bs), "\n;") { + if _, err := tx.Exec(stmt); err != nil { + return wrap(err, stmt) + } + } + } + + return wrap(tx.Commit()) +} + +type schemaVersion struct { + SchemaVersion int + AppliedAt int64 + SyncthingVersion string +} + +func (s *schemaVersion) AppliedTime() time.Time { + return time.Unix(0, s.AppliedAt) +} + +func (s *baseDB) setAppliedSchemaVersion(ver int) error { + _, err := s.stmt(` + INSERT OR IGNORE INTO schemamigrations (schema_version, applied_at, syncthing_version) + VALUES (?, ?, ?) + `).Exec(ver, time.Now().UnixNano(), build.LongVersion) + return wrap(err) +} + +func (s *baseDB) getAppliedSchemaVersion() (schemaVersion, error) { + var v schemaVersion + err := s.stmt(` + SELECT schema_version as schemaversion, applied_at as appliedat, syncthing_version as syncthingversion FROM schemamigrations + ORDER BY schema_version DESC + LIMIT 1 + `).Get(&v) + return v, wrap(err) +} diff --git a/internal/db/sqlite/db.go b/internal/db/sqlite/db.go deleted file mode 100644 index a0a6a5f1f..000000000 --- a/internal/db/sqlite/db.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (C) 2025 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at https://mozilla.org/MPL/2.0/. - -package sqlite - -import ( - "sync" - "time" - - "github.com/jmoiron/sqlx" - "github.com/syncthing/syncthing/internal/db" - "github.com/syncthing/syncthing/lib/protocol" - "github.com/thejerf/suture/v4" -) - -type DB struct { - sql *sqlx.DB - localDeviceIdx int64 - deleteRetention time.Duration - - updateLock sync.Mutex - updatePoints int - - statementsMut sync.RWMutex - statements map[string]*sqlx.Stmt - tplInput map[string]any -} - -var _ db.DB = (*DB)(nil) - -type Option func(*DB) - -func WithDeleteRetention(d time.Duration) Option { - return func(s *DB) { - s.deleteRetention = d - } -} - -func (s *DB) Close() error { - s.updateLock.Lock() - s.statementsMut.Lock() - defer s.updateLock.Unlock() - defer s.statementsMut.Unlock() - for _, stmt := range s.statements { - stmt.Close() - } - return wrap(s.sql.Close()) -} - -func (s *DB) Service(maintenanceInterval time.Duration) suture.Service { - return newService(s, maintenanceInterval) -} - -func (s *DB) ListFolders() ([]string, error) { - var res []string - err := s.stmt(` - SELECT folder_id FROM folders - ORDER BY folder_id - `).Select(&res) - return res, wrap(err) -} - -func (s *DB) ListDevicesForFolder(folder string) ([]protocol.DeviceID, error) { - var res []string - err := s.stmt(` - SELECT d.device_id FROM counts s - INNER JOIN folders o ON o.idx = s.folder_idx - INNER JOIN devices d ON d.idx = s.device_idx - WHERE o.folder_id = ? AND s.count > 0 AND s.device_idx != {{.LocalDeviceIdx}} - GROUP BY d.device_id - ORDER BY d.device_id - `).Select(&res, folder) - if err != nil { - return nil, wrap(err) - } - - devs := make([]protocol.DeviceID, len(res)) - for i, s := range res { - devs[i], err = protocol.DeviceIDFromString(s) - if err != nil { - return nil, wrap(err) - } - } - return devs, nil -} diff --git a/internal/db/sqlite/db_folderdb.go b/internal/db/sqlite/db_folderdb.go new file mode 100644 index 000000000..1af061a28 --- /dev/null +++ b/internal/db/sqlite/db_folderdb.go @@ -0,0 +1,391 @@ +// Copyright (C) 2025 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package sqlite + +import ( + "database/sql" + "errors" + "fmt" + "iter" + "path/filepath" + "time" + + "github.com/syncthing/syncthing/internal/db" + "github.com/syncthing/syncthing/internal/itererr" + "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/protocol" +) + +var errNoSuchFolder = errors.New("no such folder") + +func (s *DB) getFolderDB(folder string, create bool) (*folderDB, error) { + // Check for an already open database + s.folderDBsMut.RLock() + fdb, ok := s.folderDBs[folder] + s.folderDBsMut.RUnlock() + if ok { + return fdb, nil + } + + // Check for an existing folder. If we're not supposed to create the + // folder, we don't move on if it doesn't already have an ID. + var idx int64 + if err := s.stmt(` + SELECT idx FROM folders + WHERE folder_id = ? + `).Get(&idx, folder); err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, wrap(err) + } + if idx == 0 && !create { + return nil, errNoSuchFolder + } + + // Create a folder ID and database if it does not already exist + s.folderDBsMut.Lock() + defer s.folderDBsMut.Unlock() + if fdb, ok := s.folderDBs[folder]; ok { + return fdb, nil + } + + var err error + if idx == 0 { + // First time we want to access this folder, need to create a new + // folder ID + idx, err = s.folderIdxLocked(folder) + if err != nil { + return nil, err + } + } + + name := fmt.Sprintf("folder.%04x.db", idx) + path := filepath.Join(s.pathBase, name) + fdb, err = s.folderDBOpener(folder, path, s.deleteRetention) + if err != nil { + return nil, err + } + s.folderDBs[folder] = fdb + return fdb, nil +} + +func (s *DB) Update(folder string, device protocol.DeviceID, fs []protocol.FileInfo) error { + fdb, err := s.getFolderDB(folder, true) + if err != nil { + return err + } + return fdb.Update(device, fs) +} + +func (s *DB) GetDeviceFile(folder string, device protocol.DeviceID, file string) (protocol.FileInfo, bool, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return protocol.FileInfo{}, false, nil + } + if err != nil { + return protocol.FileInfo{}, false, err + } + return fdb.GetDeviceFile(device, file) +} + +func (s *DB) GetGlobalAvailability(folder, file string) ([]protocol.DeviceID, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return nil, nil + } + if err != nil { + return nil, err + } + return fdb.GetGlobalAvailability(file) +} + +func (s *DB) GetGlobalFile(folder string, file string) (protocol.FileInfo, bool, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return protocol.FileInfo{}, false, nil + } + if err != nil { + return protocol.FileInfo{}, false, err + } + return fdb.GetGlobalFile(file) +} + +func (s *DB) AllGlobalFiles(folder string) (iter.Seq[db.FileMetadata], func() error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return func(yield func(db.FileMetadata) bool) {}, func() error { return nil } + } + if err != nil { + return func(yield func(db.FileMetadata) bool) {}, func() error { return err } + } + return fdb.AllGlobalFiles() +} + +func (s *DB) AllGlobalFilesPrefix(folder string, prefix string) (iter.Seq[db.FileMetadata], func() error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return func(yield func(db.FileMetadata) bool) {}, func() error { return nil } + } + if err != nil { + return func(yield func(db.FileMetadata) bool) {}, func() error { return err } + } + return fdb.AllGlobalFilesPrefix(prefix) +} + +func (s *DB) AllLocalBlocksWithHash(hash []byte) ([]db.BlockMapEntry, error) { + var entries []db.BlockMapEntry + err := s.forEachFolder(func(fdb *folderDB) error { + es, err := itererr.Collect(fdb.AllLocalBlocksWithHash(hash)) + entries = append(entries, es...) + return err + }) + return entries, err +} + +func (s *DB) AllLocalFilesWithBlocksHashAnyFolder(hash []byte) (map[string][]db.FileMetadata, error) { + res := make(map[string][]db.FileMetadata) + err := s.forEachFolder(func(fdb *folderDB) error { + files, err := itererr.Collect(fdb.AllLocalFilesWithBlocksHash(hash)) + res[fdb.folderID] = files + return err + }) + return res, err +} + +func (s *DB) AllLocalFiles(folder string, device protocol.DeviceID) (iter.Seq[protocol.FileInfo], func() error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return func(yield func(protocol.FileInfo) bool) {}, func() error { return nil } + } + if err != nil { + return func(yield func(protocol.FileInfo) bool) {}, func() error { return err } + } + return fdb.AllLocalFiles(device) +} + +func (s *DB) AllLocalFilesBySequence(folder string, device protocol.DeviceID, startSeq int64, limit int) (iter.Seq[protocol.FileInfo], func() error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return func(yield func(protocol.FileInfo) bool) {}, func() error { return nil } + } + if err != nil { + return func(yield func(protocol.FileInfo) bool) {}, func() error { return err } + } + return fdb.AllLocalFilesBySequence(device, startSeq, limit) +} + +func (s *DB) AllLocalFilesWithPrefix(folder string, device protocol.DeviceID, prefix string) (iter.Seq[protocol.FileInfo], func() error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return func(yield func(protocol.FileInfo) bool) {}, func() error { return nil } + } + if err != nil { + return func(yield func(protocol.FileInfo) bool) {}, func() error { return err } + } + return fdb.AllLocalFilesWithPrefix(device, prefix) +} + +func (s *DB) AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Seq[db.FileMetadata], func() error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return func(yield func(db.FileMetadata) bool) {}, func() error { return nil } + } + if err != nil { + return func(yield func(db.FileMetadata) bool) {}, func() error { return err } + } + return fdb.AllLocalFilesWithBlocksHash(h) +} + +func (s *DB) AllNeededGlobalFiles(folder string, device protocol.DeviceID, order config.PullOrder, limit, offset int) (iter.Seq[protocol.FileInfo], func() error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return func(yield func(protocol.FileInfo) bool) {}, func() error { return nil } + } + if err != nil { + return func(yield func(protocol.FileInfo) bool) {}, func() error { return err } + } + return fdb.AllNeededGlobalFiles(device, order, limit, offset) +} + +func (s *DB) DropAllFiles(folder string, device protocol.DeviceID) error { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return nil + } + if err != nil { + return err + } + return fdb.DropAllFiles(device) +} + +func (s *DB) DropFilesNamed(folder string, device protocol.DeviceID, names []string) error { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return nil + } + if err != nil { + return err + } + return fdb.DropFilesNamed(device, names) +} + +func (s *DB) ListDevicesForFolder(folder string) ([]protocol.DeviceID, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return nil, nil + } + if err != nil { + return nil, err + } + return fdb.ListDevicesForFolder() +} + +func (s *DB) RemoteSequences(folder string) (map[protocol.DeviceID]int64, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return nil, nil + } + if err != nil { + return nil, err + } + return fdb.RemoteSequences() +} + +func (s *DB) CountGlobal(folder string) (db.Counts, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return db.Counts{}, nil + } + if err != nil { + return db.Counts{}, err + } + return fdb.CountGlobal() +} + +func (s *DB) CountLocal(folder string, device protocol.DeviceID) (db.Counts, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return db.Counts{}, nil + } + if err != nil { + return db.Counts{}, err + } + return fdb.CountLocal(device) +} + +func (s *DB) CountNeed(folder string, device protocol.DeviceID) (db.Counts, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return db.Counts{}, nil + } + if err != nil { + return db.Counts{}, err + } + return fdb.CountNeed(device) +} + +func (s *DB) CountReceiveOnlyChanged(folder string) (db.Counts, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return db.Counts{}, nil + } + if err != nil { + return db.Counts{}, err + } + return fdb.CountReceiveOnlyChanged() +} + +func (s *DB) DropAllIndexIDs() error { + return s.forEachFolder(func(fdb *folderDB) error { + return fdb.DropAllIndexIDs() + }) +} + +func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.IndexID, error) { + fdb, err := s.getFolderDB(folder, true) + if err != nil { + return 0, err + } + return fdb.GetIndexID(device) +} + +func (s *DB) SetIndexID(folder string, device protocol.DeviceID, id protocol.IndexID) error { + fdb, err := s.getFolderDB(folder, true) + if err != nil { + return err + } + return fdb.SetIndexID(device, id) +} + +func (s *DB) GetDeviceSequence(folder string, device protocol.DeviceID) (int64, error) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return 0, nil + } + if err != nil { + return 0, err + } + return fdb.GetDeviceSequence(device) +} + +func (s *DB) DeleteMtime(folder, name string) error { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return nil + } + if err != nil { + return err + } + return fdb.DeleteMtime(name) +} + +func (s *DB) GetMtime(folder, name string) (ondisk, virtual time.Time) { + fdb, err := s.getFolderDB(folder, false) + if errors.Is(err, errNoSuchFolder) { + return time.Time{}, time.Time{} + } + if err != nil { + return time.Time{}, time.Time{} + } + return fdb.GetMtime(name) +} + +func (s *DB) PutMtime(folder, name string, ondisk, virtual time.Time) error { + fdb, err := s.getFolderDB(folder, true) + if err != nil { + return err + } + return fdb.PutMtime(name, ondisk, virtual) +} + +func (s *DB) DropDevice(device protocol.DeviceID) error { + return s.forEachFolder(func(fdb *folderDB) error { + return fdb.DropDevice(device) + }) +} + +// forEachFolder runs the function for each currently open folderDB, +// returning the first error that was encountered. +func (s *DB) forEachFolder(fn func(fdb *folderDB) error) error { + folders, err := s.ListFolders() + if err != nil { + return err + } + + var firstError error + for _, folder := range folders { + fdb, err := s.getFolderDB(folder, false) + if err != nil { + if firstError == nil { + firstError = err + } + continue + } + if err := fn(fdb); err != nil && firstError == nil { + firstError = err + } + } + return firstError +} diff --git a/internal/db/sqlite/db_kv.go b/internal/db/sqlite/db_kv.go index 077ad3609..8e81d7d72 100644 --- a/internal/db/sqlite/db_kv.go +++ b/internal/db/sqlite/db_kv.go @@ -13,7 +13,7 @@ import ( "github.com/syncthing/syncthing/internal/db" ) -func (s *DB) GetKV(key string) ([]byte, error) { +func (s *baseDB) GetKV(key string) ([]byte, error) { var val []byte if err := s.stmt(` SELECT value FROM kv @@ -24,7 +24,7 @@ func (s *DB) GetKV(key string) ([]byte, error) { return val, nil } -func (s *DB) PutKV(key string, val []byte) error { +func (s *baseDB) PutKV(key string, val []byte) error { s.updateLock.Lock() defer s.updateLock.Unlock() _, err := s.stmt(` @@ -34,7 +34,7 @@ func (s *DB) PutKV(key string, val []byte) error { return wrap(err) } -func (s *DB) DeleteKV(key string) error { +func (s *baseDB) DeleteKV(key string) error { s.updateLock.Lock() defer s.updateLock.Unlock() _, err := s.stmt(` @@ -43,7 +43,7 @@ func (s *DB) DeleteKV(key string) error { return wrap(err) } -func (s *DB) PrefixKV(prefix string) (iter.Seq[db.KeyValue], func() error) { +func (s *baseDB) PrefixKV(prefix string) (iter.Seq[db.KeyValue], func() error) { var rows *sqlx.Rows var err error if prefix == "" { diff --git a/internal/db/sqlite/db_local_test.go b/internal/db/sqlite/db_local_test.go index 8444a07d8..b1e98954e 100644 --- a/internal/db/sqlite/db_local_test.go +++ b/internal/db/sqlite/db_local_test.go @@ -9,8 +9,6 @@ package sqlite import ( "testing" - "github.com/syncthing/syncthing/internal/db" - "github.com/syncthing/syncthing/internal/itererr" "github.com/syncthing/syncthing/lib/protocol" ) @@ -52,7 +50,7 @@ func TestBlocks(t *testing.T) { // Search for blocks - vals, err := itererr.Collect(db.AllLocalBlocksWithHash([]byte{1, 2, 3})) + vals, err := db.AllLocalBlocksWithHash([]byte{1, 2, 3}) if err != nil { t.Fatal(err) } @@ -66,27 +64,23 @@ func TestBlocks(t *testing.T) { // Get FileInfos for those blocks - found := 0 - it, errFn := db.AllLocalFilesWithBlocksHashAnyFolder(vals[0].BlocklistHash) - for folder, fileInfo := range it { - if folder != folderID { - t.Fatal("should be same folder") - } - if fileInfo.Name != "file1" { - t.Fatal("should be file1") - } - found++ - } - if err := errFn(); err != nil { + res, err := db.AllLocalFilesWithBlocksHashAnyFolder(vals[0].BlocklistHash) + if err != nil { t.Fatal(err) } - if found != 1 { + if len(res) != 1 { + t.Fatal("should return one folder") + } + if len(res[folderID]) != 1 { t.Fatal("should find one file") } + if res[folderID][0].Name != "file1" { + t.Fatal("should be file1") + } // Get the other blocks - vals, err = itererr.Collect(db.AllLocalBlocksWithHash([]byte{3, 4, 5})) + vals, err = db.AllLocalBlocksWithHash([]byte{3, 4, 5}) if err != nil { t.Fatal(err) } @@ -125,7 +119,10 @@ func TestBlocksDeleted(t *testing.T) { // We should find one entry for the block hash search := file.Blocks[0].Hash - es := mustCollect[db.BlockMapEntry](t)(sdb.AllLocalBlocksWithHash(search)) + es, err := sdb.AllLocalBlocksWithHash(search) + if err != nil { + t.Fatal(err) + } if len(es) != 1 { t.Fatal("expected one hit") } @@ -137,13 +134,17 @@ func TestBlocksDeleted(t *testing.T) { } // Searching for the old hash should yield no hits - if hits := mustCollect[db.BlockMapEntry](t)(sdb.AllLocalBlocksWithHash(search)); len(hits) != 0 { + if hits, err := sdb.AllLocalBlocksWithHash(search); err != nil { + t.Fatal(err) + } else if len(hits) != 0 { t.Log(hits) t.Error("expected no hits") } // Searching for the new hash should yield one hits - if hits := mustCollect[db.BlockMapEntry](t)(sdb.AllLocalBlocksWithHash(file.Blocks[0].Hash)); len(hits) != 1 { + if hits, err := sdb.AllLocalBlocksWithHash(file.Blocks[0].Hash); err != nil { + t.Fatal(err) + } else if len(hits) != 1 { t.Log(hits) t.Error("expected one hit") } diff --git a/internal/db/sqlite/db_mtimes.go b/internal/db/sqlite/db_mtimes.go deleted file mode 100644 index 1c3dc7765..000000000 --- a/internal/db/sqlite/db_mtimes.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (C) 2025 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at https://mozilla.org/MPL/2.0/. - -package sqlite - -import ( - "time" -) - -func (s *DB) GetMtime(folder, name string) (ondisk, virtual time.Time) { - var res struct { - Ondisk int64 - Virtual int64 - } - if err := s.stmt(` - SELECT m.ondisk, m.virtual FROM mtimes m - INNER JOIN folders o ON o.idx = m.folder_idx - WHERE o.folder_id = ? AND m.name = ? - `).Get(&res, folder, name); err != nil { - return time.Time{}, time.Time{} - } - return time.Unix(0, res.Ondisk), time.Unix(0, res.Virtual) -} - -func (s *DB) PutMtime(folder, name string, ondisk, virtual time.Time) error { - s.updateLock.Lock() - defer s.updateLock.Unlock() - folderIdx, err := s.folderIdxLocked(folder) - if err != nil { - return wrap(err) - } - _, err = s.stmt(` - INSERT OR REPLACE INTO mtimes (folder_idx, name, ondisk, virtual) - VALUES (?, ?, ?, ?) - `).Exec(folderIdx, name, ondisk.UnixNano(), virtual.UnixNano()) - return wrap(err) -} - -func (s *DB) DeleteMtime(folder, name string) error { - s.updateLock.Lock() - defer s.updateLock.Unlock() - folderIdx, err := s.folderIdxLocked(folder) - if err != nil { - return wrap(err) - } - _, err = s.stmt(` - DELETE FROM mtimes - WHERE folder_idx = ? AND name = ? - `).Exec(folderIdx, name) - return wrap(err) -} diff --git a/internal/db/sqlite/db_open.go b/internal/db/sqlite/db_open.go index b7d6346b7..45c3547a0 100644 --- a/internal/db/sqlite/db_open.go +++ b/internal/db/sqlite/db_open.go @@ -7,57 +7,104 @@ package sqlite import ( - "database/sql" "os" "path/filepath" - "strconv" - "strings" - "text/template" + "sync" + "time" - "github.com/jmoiron/sqlx" - "github.com/syncthing/syncthing/lib/build" - "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/internal/db" ) -const maxDBConns = 128 +const maxDBConns = 16 + +type DB struct { + pathBase string + deleteRetention time.Duration + + *baseDB + + folderDBsMut sync.RWMutex + folderDBs map[string]*folderDB + folderDBOpener func(folder, path string, deleteRetention time.Duration) (*folderDB, error) +} + +var _ db.DB = (*DB)(nil) + +type Option func(*DB) + +func WithDeleteRetention(d time.Duration) Option { + return func(s *DB) { + s.deleteRetention = d + } +} func Open(path string, opts ...Option) (*DB, error) { - // Open the database with options to enable foreign keys and recursive - // triggers (needed for the delete+insert triggers on row replace). - sqlDB, err := sqlx.Open(dbDriver, "file:"+path+"?"+commonOptions) + pragmas := []string{ + "journal_mode = WAL", + "optimize = 0x10002", + "auto_vacuum = INCREMENTAL", + "default_temp_store = MEMORY", + "temp_store = MEMORY", + } + schemas := []string{ + "sql/schema/common/*", + "sql/schema/main/*", + } + + os.MkdirAll(path, 0o700) + mainPath := filepath.Join(path, "main.db") + mainBase, err := openBase(mainPath, maxDBConns, pragmas, schemas, nil) if err != nil { - return nil, wrap(err) + return nil, err } - sqlDB.SetMaxOpenConns(maxDBConns) - if _, err := sqlDB.Exec(`PRAGMA journal_mode = WAL`); err != nil { - return nil, wrap(err, "PRAGMA journal_mode") + + db := &DB{ + pathBase: path, + baseDB: mainBase, + folderDBs: make(map[string]*folderDB), + folderDBOpener: openFolderDB, } - if _, err := sqlDB.Exec(`PRAGMA optimize = 0x10002`); err != nil { - // https://www.sqlite.org/pragma.html#pragma_optimize - return nil, wrap(err, "PRAGMA optimize") - } - return openCommon(sqlDB, opts...) + + return db, nil } // Open the database with options suitable for the migration inserts. This // is not a safe mode of operation for normal processing, use only for bulk // inserts with a close afterwards. func OpenForMigration(path string) (*DB, error) { - sqlDB, err := sqlx.Open(dbDriver, "file:"+path+"?"+commonOptions) + pragmas := []string{ + "journal_mode = OFF", + "default_temp_store = MEMORY", + "temp_store = MEMORY", + "foreign_keys = 0", + "synchronous = 0", + "locking_mode = EXCLUSIVE", + } + schemas := []string{ + "sql/schema/common/*", + "sql/schema/main/*", + } + + os.MkdirAll(path, 0o700) + mainPath := filepath.Join(path, "main.db") + mainBase, err := openBase(mainPath, 1, pragmas, schemas, nil) if err != nil { - return nil, wrap(err, "open") + return nil, err } - sqlDB.SetMaxOpenConns(1) - if _, err := sqlDB.Exec(`PRAGMA foreign_keys = 0`); err != nil { - return nil, wrap(err, "PRAGMA foreign_keys") + + db := &DB{ + pathBase: path, + baseDB: mainBase, + folderDBs: make(map[string]*folderDB), + folderDBOpener: openFolderDBForMigration, } - if _, err := sqlDB.Exec(`PRAGMA journal_mode = OFF`); err != nil { - return nil, wrap(err, "PRAGMA journal_mode") - } - if _, err := sqlDB.Exec(`PRAGMA synchronous = 0`); err != nil { - return nil, wrap(err, "PRAGMA synchronous") - } - return openCommon(sqlDB) + + // // Touch device IDs that should always exist and have a low index + // // numbers, and will never change + // db.localDeviceIdx, _ = db.deviceIdxLocked(protocol.LocalDeviceID) + // db.tplInput["LocalDeviceIdx"] = db.localDeviceIdx + + return db, nil } func OpenTemp() (*DB, error) { @@ -73,134 +120,12 @@ func OpenTemp() (*DB, error) { return Open(path) } -func openCommon(sqlDB *sqlx.DB, opts ...Option) (*DB, error) { - if _, err := sqlDB.Exec(`PRAGMA auto_vacuum = INCREMENTAL`); err != nil { - return nil, wrap(err, "PRAGMA auto_vacuum") +func (s *DB) Close() error { + s.folderDBsMut.Lock() + defer s.folderDBsMut.Unlock() + for folder, fdb := range s.folderDBs { + fdb.Close() + delete(s.folderDBs, folder) } - if _, err := sqlDB.Exec(`PRAGMA default_temp_store = MEMORY`); err != nil { - return nil, wrap(err, "PRAGMA default_temp_store") - } - if _, err := sqlDB.Exec(`PRAGMA temp_store = MEMORY`); err != nil { - return nil, wrap(err, "PRAGMA temp_store") - } - - db := &DB{ - sql: sqlDB, - deleteRetention: defaultDeleteRetention, - statements: make(map[string]*sqlx.Stmt), - } - for _, opt := range opts { - opt(db) - } - if db.deleteRetention > 0 && db.deleteRetention < minDeleteRetention { - db.deleteRetention = minDeleteRetention - } - - if err := db.runScripts("sql/schema/*"); err != nil { - return nil, wrap(err) - } - - ver, _ := db.getAppliedSchemaVersion() - if ver.SchemaVersion > 0 { - filter := func(scr string) bool { - scr = filepath.Base(scr) - nstr, _, ok := strings.Cut(scr, "-") - if !ok { - return false - } - n, err := strconv.ParseInt(nstr, 10, 32) - if err != nil { - return false - } - return int(n) > ver.SchemaVersion - } - if err := db.runScripts("sql/migrations/*", filter); err != nil { - return nil, wrap(err) - } - } - - // Touch device IDs that should always exist and have a low index - // numbers, and will never change - db.localDeviceIdx, _ = db.deviceIdxLocked(protocol.LocalDeviceID) - - // Set the current schema version, if not already set - if err := db.setAppliedSchemaVersion(currentSchemaVersion); err != nil { - return nil, wrap(err) - } - - db.tplInput = map[string]any{ - "FlagLocalUnsupported": protocol.FlagLocalUnsupported, - "FlagLocalIgnored": protocol.FlagLocalIgnored, - "FlagLocalMustRescan": protocol.FlagLocalMustRescan, - "FlagLocalReceiveOnly": protocol.FlagLocalReceiveOnly, - "FlagLocalGlobal": protocol.FlagLocalGlobal, - "FlagLocalNeeded": protocol.FlagLocalNeeded, - "LocalDeviceIdx": db.localDeviceIdx, - "SyncthingVersion": build.LongVersion, - } - - return db, nil + return wrap(s.baseDB.Close()) } - -var tplFuncs = template.FuncMap{ - "or": func(vs ...int) int { - v := vs[0] - for _, ov := range vs[1:] { - v |= ov - } - return v - }, -} - -// stmt returns a prepared statement for the given SQL string, after -// applying local template expansions. The statement is cached. -func (s *DB) stmt(tpl string) stmt { - tpl = strings.TrimSpace(tpl) - - // Fast concurrent lookup of cached statement - s.statementsMut.RLock() - stmt, ok := s.statements[tpl] - s.statementsMut.RUnlock() - if ok { - return stmt - } - - // On miss, take the full lock, check again - s.statementsMut.Lock() - defer s.statementsMut.Unlock() - stmt, ok = s.statements[tpl] - if ok { - return stmt - } - - // Apply template expansions - var sb strings.Builder - compTpl := template.Must(template.New("tpl").Funcs(tplFuncs).Parse(tpl)) - if err := compTpl.Execute(&sb, s.tplInput); err != nil { - panic("bug: bad template: " + err.Error()) - } - - // Prepare and cache - stmt, err := s.sql.Preparex(sb.String()) - if err != nil { - return failedStmt{err} - } - s.statements[tpl] = stmt - return stmt -} - -type stmt interface { - Exec(args ...any) (sql.Result, error) - Get(dest any, args ...any) error - Queryx(args ...any) (*sqlx.Rows, error) - Select(dest any, args ...any) error -} - -type failedStmt struct { - err error -} - -func (f failedStmt) Exec(_ ...any) (sql.Result, error) { return nil, f.err } -func (f failedStmt) Get(_ any, _ ...any) error { return f.err } -func (f failedStmt) Queryx(_ ...any) (*sqlx.Rows, error) { return nil, f.err } -func (f failedStmt) Select(_ any, _ ...any) error { return f.err } diff --git a/internal/db/sqlite/db_schema.go b/internal/db/sqlite/db_schema.go deleted file mode 100644 index 89cebd7ac..000000000 --- a/internal/db/sqlite/db_schema.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (C) 2025 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at https://mozilla.org/MPL/2.0/. - -package sqlite - -import ( - "embed" - "io/fs" - "strings" - "time" - - "github.com/syncthing/syncthing/lib/build" -) - -const currentSchemaVersion = 1 - -//go:embed sql/** -var embedded embed.FS - -func (s *DB) runScripts(glob string, filter ...func(s string) bool) error { - scripts, err := fs.Glob(embedded, glob) - if err != nil { - return wrap(err) - } - - tx, err := s.sql.Begin() - if err != nil { - return wrap(err) - } - defer tx.Rollback() //nolint:errcheck - -nextScript: - for _, scr := range scripts { - for _, fn := range filter { - if !fn(scr) { - l.Debugln("Skipping script", scr) - continue nextScript - } - } - l.Debugln("Executing script", scr) - bs, err := fs.ReadFile(embedded, scr) - if err != nil { - return wrap(err, scr) - } - // SQLite requires one statement per exec, so we split the init - // files on lines containing only a semicolon and execute them - // separately. We require it on a separate line because there are - // also statement-internal semicolons in the triggers. - for _, stmt := range strings.Split(string(bs), "\n;") { - if _, err := tx.Exec(stmt); err != nil { - return wrap(err, stmt) - } - } - } - - return wrap(tx.Commit()) -} - -type schemaVersion struct { - SchemaVersion int - AppliedAt int64 - SyncthingVersion string -} - -func (s *schemaVersion) AppliedTime() time.Time { - return time.Unix(0, s.AppliedAt) -} - -func (s *DB) setAppliedSchemaVersion(ver int) error { - _, err := s.stmt(` - INSERT OR IGNORE INTO schemamigrations (schema_version, applied_at, syncthing_version) - VALUES (?, ?, ?) - `).Exec(ver, time.Now().UnixNano(), build.LongVersion) - return wrap(err) -} - -func (s *DB) getAppliedSchemaVersion() (schemaVersion, error) { - var v schemaVersion - err := s.stmt(` - SELECT schema_version as schemaversion, applied_at as appliedat, syncthing_version as syncthingversion FROM schemamigrations - ORDER BY schema_version DESC - LIMIT 1 - `).Get(&v) - return v, wrap(err) -} diff --git a/internal/db/sqlite/db_service.go b/internal/db/sqlite/db_service.go index c31acb19a..c28a6586c 100644 --- a/internal/db/sqlite/db_service.go +++ b/internal/db/sqlite/db_service.go @@ -11,7 +11,9 @@ import ( "fmt" "time" + "github.com/jmoiron/sqlx" "github.com/syncthing/syncthing/internal/db" + "github.com/thejerf/suture/v4" ) const ( @@ -21,6 +23,10 @@ const ( minDeleteRetention = 24 * time.Hour ) +func (s *DB) Service(maintenanceInterval time.Duration) suture.Service { + return newService(s, maintenanceInterval) +} + type Service struct { sdb *DB maintenanceInterval time.Duration @@ -80,14 +86,25 @@ func (s *Service) periodic(ctx context.Context) error { t1 := time.Now() defer func() { l.Debugln("Periodic done in", time.Since(t1), "+", t1.Sub(t0)) }() - if err := s.garbageCollectOldDeletedLocked(); err != nil { - return wrap(err) - } - if err := s.garbageCollectBlocklistsAndBlocksLocked(ctx); err != nil { - return wrap(err) - } + tidy(ctx, s.sdb.sql) - conn, err := s.sdb.sql.Conn(ctx) + return wrap(s.sdb.forEachFolder(func(fdb *folderDB) error { + fdb.updateLock.Lock() + defer fdb.updateLock.Unlock() + + if err := garbageCollectOldDeletedLocked(fdb); err != nil { + return wrap(err) + } + if err := garbageCollectBlocklistsAndBlocksLocked(ctx, fdb); err != nil { + return wrap(err) + } + tidy(ctx, fdb.sql) + return nil + })) +} + +func tidy(ctx context.Context, db *sqlx.DB) error { + conn, err := db.Conn(ctx) if err != nil { return wrap(err) } @@ -95,35 +112,34 @@ func (s *Service) periodic(ctx context.Context) error { _, _ = conn.ExecContext(ctx, `ANALYZE`) _, _ = conn.ExecContext(ctx, `PRAGMA optimize`) _, _ = conn.ExecContext(ctx, `PRAGMA incremental_vacuum`) - _, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 67108864`) + _, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 8388608`) _, _ = conn.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`) - return nil } -func (s *Service) garbageCollectOldDeletedLocked() error { - if s.sdb.deleteRetention <= 0 { - l.Debugln("Delete retention is infinite, skipping cleanup") +func garbageCollectOldDeletedLocked(fdb *folderDB) error { + if fdb.deleteRetention <= 0 { + l.Debugln(fdb.baseName, "delete retention is infinite, skipping cleanup") return nil } // Remove deleted files that are marked as not needed (we have processed // them) and they were deleted more than MaxDeletedFileAge ago. - l.Debugln("Forgetting deleted files older than", s.sdb.deleteRetention) - res, err := s.sdb.stmt(` + l.Debugln(fdb.baseName, "forgetting deleted files older than", fdb.deleteRetention) + res, err := fdb.stmt(` DELETE FROM files WHERE deleted AND modified < ? AND local_flags & {{.FlagLocalNeeded}} == 0 - `).Exec(time.Now().Add(-s.sdb.deleteRetention).UnixNano()) + `).Exec(time.Now().Add(-fdb.deleteRetention).UnixNano()) if err != nil { return wrap(err) } if aff, err := res.RowsAffected(); err == nil { - l.Debugln("Removed old deleted file records:", aff) + l.Debugln(fdb.baseName, "removed old deleted file records:", aff) } return nil } -func (s *Service) garbageCollectBlocklistsAndBlocksLocked(ctx context.Context) error { +func garbageCollectBlocklistsAndBlocksLocked(ctx context.Context, fdb *folderDB) error { // Remove all blocklists not referred to by any files and, by extension, // any blocks not referred to by a blocklist. This is an expensive // operation when run normally, especially if there are a lot of blocks @@ -134,7 +150,7 @@ func (s *Service) garbageCollectBlocklistsAndBlocksLocked(ctx context.Context) e // an explicit connection and disabling foreign keys before starting the // transaction. We make sure to clean up on the way out. - conn, err := s.sdb.sql.Connx(ctx) + conn, err := fdb.sql.Connx(ctx) if err != nil { return wrap(err) } @@ -161,7 +177,7 @@ func (s *Service) garbageCollectBlocklistsAndBlocksLocked(ctx context.Context) e return wrap(err, "delete blocklists") } else if shouldDebug() { rows, err := res.RowsAffected() - l.Debugln("Blocklist GC:", rows, err) + l.Debugln(fdb.baseName, "blocklist GC:", rows, err) } if res, err := tx.ExecContext(ctx, ` @@ -172,7 +188,7 @@ func (s *Service) garbageCollectBlocklistsAndBlocksLocked(ctx context.Context) e return wrap(err, "delete blocks") } else if shouldDebug() { rows, err := res.RowsAffected() - l.Debugln("Blocks GC:", rows, err) + l.Debugln(fdb.baseName, "blocks GC:", rows, err) } return wrap(tx.Commit()) diff --git a/internal/db/sqlite/db_test.go b/internal/db/sqlite/db_test.go index 2fb7a6721..c6e2528e9 100644 --- a/internal/db/sqlite/db_test.go +++ b/internal/db/sqlite/db_test.go @@ -298,6 +298,7 @@ func TestBasics(t *testing.T) { t.Fatal(err) } if len(folders) != 1 || folders[0] != folderID { + t.Log(folders) t.Error("expected one folder") } }) @@ -1009,15 +1010,20 @@ func TestBlocklistGarbageCollection(t *testing.T) { // There should exist three blockslists and six blocks + fdb, err := sdb.getFolderDB(folderID, false) + if err != nil { + t.Fatal(err) + } + var count int - if err := sdb.sql.Get(&count, `SELECT count(*) FROM blocklists`); err != nil { + if err := fdb.sql.Get(&count, `SELECT count(*) FROM blocklists`); err != nil { t.Fatal(err) } if count != 3 { t.Log(count) t.Fatal("expected 3 blocklists") } - if err := sdb.sql.Get(&count, `SELECT count(*) FROM blocks`); err != nil { + if err := fdb.sql.Get(&count, `SELECT count(*) FROM blocks`); err != nil { t.Fatal(err) } if count != 6 { @@ -1039,14 +1045,14 @@ func TestBlocklistGarbageCollection(t *testing.T) { // There should exist two blockslists and four blocks - if err := sdb.sql.Get(&count, `SELECT count(*) FROM blocklists`); err != nil { + if err := fdb.sql.Get(&count, `SELECT count(*) FROM blocklists`); err != nil { t.Fatal(err) } if count != 2 { t.Log(count) t.Error("expected 2 blocklists") } - if err := sdb.sql.Get(&count, `SELECT count(*) FROM blocks`); err != nil { + if err := fdb.sql.Get(&count, `SELECT count(*) FROM blocks`); err != nil { t.Fatal(err) } if count != 3 { @@ -1078,7 +1084,7 @@ func TestInsertLargeFile(t *testing.T) { // Verify all the blocks are here for i, block := range files[0].Blocks { - bs, err := itererr.Collect(sdb.AllLocalBlocksWithHash(block.Hash)) + bs, err := sdb.AllLocalBlocksWithHash(block.Hash) if err != nil { t.Fatal(err) } diff --git a/internal/db/sqlite/db_update.go b/internal/db/sqlite/db_update.go index 7b8dd9905..df4e82892 100644 --- a/internal/db/sqlite/db_update.go +++ b/internal/db/sqlite/db_update.go @@ -7,477 +7,38 @@ package sqlite import ( - "cmp" - "context" "fmt" + "os" "runtime" - "slices" "strings" - - "github.com/jmoiron/sqlx" - "github.com/syncthing/syncthing/internal/gen/dbproto" - "github.com/syncthing/syncthing/internal/itererr" - "github.com/syncthing/syncthing/lib/osutil" - "github.com/syncthing/syncthing/lib/protocol" - "github.com/syncthing/syncthing/lib/sliceutil" - "google.golang.org/protobuf/proto" ) -const ( - // Arbitrarily chosen values for checkpoint frequency.... - updatePointsPerFile = 100 - updatePointsPerBlock = 1 - updatePointsThreshold = 250_000 -) - -func (s *DB) Update(folder string, device protocol.DeviceID, fs []protocol.FileInfo) error { - s.updateLock.Lock() - defer s.updateLock.Unlock() - - folderIdx, err := s.folderIdxLocked(folder) - if err != nil { - return wrap(err) - } - deviceIdx, err := s.deviceIdxLocked(device) - if err != nil { - return wrap(err) - } - - tx, err := s.sql.BeginTxx(context.Background(), nil) - if err != nil { - return wrap(err) - } - defer tx.Rollback() //nolint:errcheck - txp := &txPreparedStmts{Tx: tx} - - //nolint:sqlclosecheck - insertFileStmt, err := txp.Preparex(` - INSERT OR REPLACE INTO files (folder_idx, device_idx, remote_sequence, name, type, modified, size, version, deleted, invalid, local_flags, blocklist_hash) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - RETURNING sequence - `) - if err != nil { - return wrap(err, "prepare insert file") - } - - //nolint:sqlclosecheck - insertFileInfoStmt, err := txp.Preparex(` - INSERT INTO fileinfos (sequence, fiprotobuf) - VALUES (?, ?) - `) - if err != nil { - return wrap(err, "prepare insert fileinfo") - } - - //nolint:sqlclosecheck - insertBlockListStmt, err := txp.Preparex(` - INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf) - VALUES (?, ?) - `) - if err != nil { - return wrap(err, "prepare insert blocklist") - } - - var prevRemoteSeq int64 - for i, f := range fs { - f.Name = osutil.NormalizedFilename(f.Name) - - var blockshash *[]byte - if len(f.Blocks) > 0 { - f.BlocksHash = protocol.BlocksHash(f.Blocks) - blockshash = &f.BlocksHash - } else { - f.BlocksHash = nil - } - - if f.Type == protocol.FileInfoTypeDirectory { - f.Size = 128 // synthetic directory size - } - - // Insert the file. - // - // If it is a remote file, set remote_sequence otherwise leave it at - // null. Returns the new local sequence. - var remoteSeq *int64 - if device != protocol.LocalDeviceID { - if i > 0 && f.Sequence == prevRemoteSeq { - return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq) - } - prevRemoteSeq = f.Sequence - remoteSeq = &f.Sequence - } - var localSeq int64 - if err := insertFileStmt.Get(&localSeq, folderIdx, deviceIdx, remoteSeq, f.Name, f.Type, f.ModTime().UnixNano(), f.Size, f.Version.String(), f.IsDeleted(), f.IsInvalid(), f.LocalFlags, blockshash); err != nil { - return wrap(err, "insert file") - } - - if len(f.Blocks) > 0 { - // Indirect the block list - blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire) - bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks}) - if err != nil { - return wrap(err, "marshal blocklist") - } - if _, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil { - return wrap(err, "insert blocklist") - } - - if device == protocol.LocalDeviceID { - // Insert all blocks - if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil { - return wrap(err, "insert blocks") - } - } - - f.Blocks = nil - } - - // Insert the fileinfo - if device == protocol.LocalDeviceID { - f.Sequence = localSeq - } - bs, err := proto.Marshal(f.ToWire(true)) - if err != nil { - return wrap(err, "marshal fileinfo") - } - if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil { - return wrap(err, "insert fileinfo") - } - - // Update global and need - if err := s.recalcGlobalForFile(txp, folderIdx, f.Name); err != nil { - return wrap(err) - } - } - - if err := tx.Commit(); err != nil { - return wrap(err) - } - - s.periodicCheckpointLocked(fs) - return nil -} - func (s *DB) DropFolder(folder string) error { + s.folderDBsMut.Lock() + defer s.folderDBsMut.Unlock() s.updateLock.Lock() defer s.updateLock.Unlock() _, err := s.stmt(` DELETE FROM folders WHERE folder_id = ? `).Exec(folder) + if fdb, ok := s.folderDBs[folder]; ok { + fdb.Close() + _ = os.Remove(fdb.path) + _ = os.Remove(fdb.path + "-wal") + _ = os.Remove(fdb.path + "-shm") + delete(s.folderDBs, folder) + } return wrap(err) } -func (s *DB) DropDevice(device protocol.DeviceID) error { - if device == protocol.LocalDeviceID { - panic("bug: cannot drop local device") - } - - s.updateLock.Lock() - defer s.updateLock.Unlock() - - deviceIdx, err := s.deviceIdxLocked(device) - if err != nil { - return wrap(err) - } - - tx, err := s.sql.BeginTxx(context.Background(), nil) - if err != nil { - return wrap(err) - } - defer tx.Rollback() //nolint:errcheck - txp := &txPreparedStmts{Tx: tx} - - // Find all folders where the device is involved - var folderIdxs []int64 - if err := tx.Select(&folderIdxs, ` - SELECT folder_idx - FROM counts - WHERE device_idx = ? AND count > 0 - GROUP BY folder_idx - `, deviceIdx); err != nil { - return wrap(err) - } - - // Drop the device, which cascades to delete all files etc for it - if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil { - return wrap(err) - } - - // Recalc the globals for all affected folders - for _, idx := range folderIdxs { - if err := s.recalcGlobalForFolder(txp, idx); err != nil { - return wrap(err) - } - } - - return wrap(tx.Commit()) -} - -func (s *DB) DropAllFiles(folder string, device protocol.DeviceID) error { - s.updateLock.Lock() - defer s.updateLock.Unlock() - - // This is a two part operation, first dropping all the files and then - // recalculating the global state for the entire folder. - - folderIdx, err := s.folderIdxLocked(folder) - if err != nil { - return wrap(err) - } - deviceIdx, err := s.deviceIdxLocked(device) - if err != nil { - return wrap(err) - } - - tx, err := s.sql.BeginTxx(context.Background(), nil) - if err != nil { - return wrap(err) - } - defer tx.Rollback() //nolint:errcheck - txp := &txPreparedStmts{Tx: tx} - - // Drop all the file entries - - result, err := tx.Exec(` - DELETE FROM files - WHERE folder_idx = ? AND device_idx = ? - `, folderIdx, deviceIdx) - if err != nil { - return wrap(err) - } - if n, err := result.RowsAffected(); err == nil && n == 0 { - // The delete affected no rows, so we don't need to redo the entire - // global/need calculation. - return wrap(tx.Commit()) - } - - // Recalc global for the entire folder - - if err := s.recalcGlobalForFolder(txp, folderIdx); err != nil { - return wrap(err) - } - return wrap(tx.Commit()) -} - -func (s *DB) DropFilesNamed(folder string, device protocol.DeviceID, names []string) error { - for i := range names { - names[i] = osutil.NormalizedFilename(names[i]) - } - - s.updateLock.Lock() - defer s.updateLock.Unlock() - - folderIdx, err := s.folderIdxLocked(folder) - if err != nil { - return wrap(err) - } - deviceIdx, err := s.deviceIdxLocked(device) - if err != nil { - return wrap(err) - } - - tx, err := s.sql.BeginTxx(context.Background(), nil) - if err != nil { - return wrap(err) - } - defer tx.Rollback() //nolint:errcheck - txp := &txPreparedStmts{Tx: tx} - - // Drop the named files - - query, args, err := sqlx.In(` - DELETE FROM files - WHERE folder_idx = ? AND device_idx = ? AND name IN (?) - `, folderIdx, deviceIdx, names) - if err != nil { - return wrap(err) - } - if _, err := tx.Exec(query, args...); err != nil { - return wrap(err) - } - - // Recalc globals for the named files - - for _, name := range names { - if err := s.recalcGlobalForFile(txp, folderIdx, name); err != nil { - return wrap(err) - } - } - - return wrap(tx.Commit()) -} - -func (*DB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error { - if len(blocks) == 0 { - return nil - } - bs := make([]map[string]any, len(blocks)) - for i, b := range blocks { - bs[i] = map[string]any{ - "hash": b.Hash, - "blocklist_hash": blocklistHash, - "idx": i, - "offset": b.Offset, - "size": b.Size, - } - } - - // Very large block lists (>8000 blocks) result in "too many variables" - // error. Chunk it to a reasonable size. - for chunk := range slices.Chunk(bs, 1000) { - if _, err := tx.NamedExec(` - INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size) - VALUES (:hash, :blocklist_hash, :idx, :offset, :size) - `, chunk); err != nil { - return wrap(err) - } - } - return nil -} - -func (s *DB) recalcGlobalForFolder(txp *txPreparedStmts, folderIdx int64) error { - // Select files where there is no global, those are the ones we need to - // recalculate. - //nolint:sqlclosecheck - namesStmt, err := txp.Preparex(` - SELECT f.name FROM files f - WHERE f.folder_idx = ? AND NOT EXISTS ( - SELECT 1 FROM files g - WHERE g.folder_idx = ? AND g.name = f.name AND g.local_flags & ? != 0 - ) - GROUP BY name - `) - if err != nil { - return wrap(err) - } - rows, err := namesStmt.Queryx(folderIdx, folderIdx, protocol.FlagLocalGlobal) - if err != nil { - return wrap(err) - } - defer rows.Close() - for rows.Next() { - var name string - if err := rows.Scan(&name); err != nil { - return wrap(err) - } - if err := s.recalcGlobalForFile(txp, folderIdx, name); err != nil { - return wrap(err) - } - } - return wrap(rows.Err()) -} - -func (s *DB) recalcGlobalForFile(txp *txPreparedStmts, folderIdx int64, file string) error { - //nolint:sqlclosecheck - selStmt, err := txp.Preparex(` - SELECT name, folder_idx, device_idx, sequence, modified, version, deleted, invalid, local_flags FROM files - WHERE folder_idx = ? AND name = ? - `) - if err != nil { - return wrap(err) - } - es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(folderIdx, file))) - if err != nil { - return wrap(err) - } - if len(es) == 0 { - // shouldn't happen - return nil - } - - // Sort the entries; the global entry is at the head of the list - slices.SortFunc(es, fileRow.Compare) - - // The global version is the first one in the list that is not invalid, - // or just the first one in the list if all are invalid. - var global fileRow - globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.Invalid }) - if globIdx < 0 { - globIdx = 0 - } - global = es[globIdx] - - // We "have" the file if the position in the list of versions is at the - // global version or better, or if the version is the same as the global - // file (we might be further down the list due to invalid flags), or if - // the global is deleted and we don't have it at all... - localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx }) - hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version - localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored - localIdx < 0 && global.Deleted // missing it, but the global is also deleted - - // Set the global flag on the global entry. Set the need flag if the - // local device needs this file, unless it's invalid. - global.LocalFlags |= protocol.FlagLocalGlobal - if hasLocal || global.Invalid { - global.LocalFlags &= ^protocol.FlagLocalNeeded - } else { - global.LocalFlags |= protocol.FlagLocalNeeded - } - //nolint:sqlclosecheck - upStmt, err := txp.Prepare(` - UPDATE files SET local_flags = ? - WHERE folder_idx = ? AND device_idx = ? AND sequence = ? - `) - if err != nil { - return wrap(err) - } - if _, err := upStmt.Exec(global.LocalFlags, global.FolderIdx, global.DeviceIdx, global.Sequence); err != nil { - return wrap(err) - } - - // Clear the need and global flags on all other entries - //nolint:sqlclosecheck - upStmt, err = txp.Prepare(` - UPDATE files SET local_flags = local_flags & ? - WHERE folder_idx = ? AND name = ? AND sequence != ? AND local_flags & ? != 0 - `) - if err != nil { - return wrap(err) - } - if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), folderIdx, global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil { - return wrap(err) - } - - return nil -} - -func (s *DB) folderIdxLocked(folderID string) (int64, error) { - if _, err := s.stmt(` - INSERT OR IGNORE INTO folders(folder_id) - VALUES (?) - `).Exec(folderID); err != nil { - return 0, wrap(err) - } - var idx int64 - if err := s.stmt(` - SELECT idx FROM folders - WHERE folder_id = ? - `).Get(&idx, folderID); err != nil { - return 0, wrap(err) - } - - return idx, nil -} - -func (s *DB) deviceIdxLocked(deviceID protocol.DeviceID) (int64, error) { - devStr := deviceID.String() - if _, err := s.stmt(` - INSERT OR IGNORE INTO devices(device_id) - VALUES (?) - `).Exec(devStr); err != nil { - return 0, wrap(err) - } - var idx int64 - if err := s.stmt(` - SELECT idx FROM devices - WHERE device_id = ? - `).Get(&idx, devStr); err != nil { - return 0, wrap(err) - } - - return idx, nil +func (s *DB) ListFolders() ([]string, error) { + var res []string + err := s.stmt(` + SELECT folder_id FROM folders + ORDER BY folder_id + `).Select(&res) + return res, wrap(err) } // wrap returns the error wrapped with the calling function name and @@ -507,94 +68,3 @@ func wrap(err error, context ...string) error { return fmt.Errorf("%s: %w", prefix, err) } - -type fileRow struct { - Name string - Version dbVector - FolderIdx int64 `db:"folder_idx"` - DeviceIdx int64 `db:"device_idx"` - Sequence int64 - Modified int64 - Size int64 - LocalFlags int64 `db:"local_flags"` - Deleted bool - Invalid bool -} - -func (e fileRow) Compare(other fileRow) int { - // From FileInfo.WinsConflict - vc := e.Version.Vector.Compare(other.Version.Vector) - switch vc { - case protocol.Equal: - if e.Invalid != other.Invalid { - if e.Invalid { - return 1 - } - return -1 - } - - // Compare the device ID index, lower is better. This is only - // deterministic to the extent that LocalDeviceID will always be the - // lowest one, order between remote devices is random (and - // irrelevant). - return cmp.Compare(e.DeviceIdx, other.DeviceIdx) - case protocol.Greater: // we are newer - return -1 - case protocol.Lesser: // we are older - return 1 - case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict - if e.Invalid != other.Invalid { - if e.Invalid { // we are invalid, we lose - return 1 - } - return -1 // they are invalid, we win - } - if e.Deleted != other.Deleted { - if e.Deleted { // we are deleted, we lose - return 1 - } - return -1 // they are deleted, we win - } - if d := cmp.Compare(e.Modified, other.Modified); d != 0 { - return -d // positive d means we were newer, so we win (negative return) - } - if vc == protocol.ConcurrentGreater { - return -1 // we have a better device ID, we win - } - return 1 // they win - default: - return 0 - } -} - -func (s *DB) periodicCheckpointLocked(fs []protocol.FileInfo) { - // Induce periodic checkpoints. We add points for each file and block, - // and checkpoint when we've written more than a threshold of points. - // This ensures we do not go too long without a checkpoint, while also - // not doing it incessantly for every update. - s.updatePoints += updatePointsPerFile * len(fs) - for _, f := range fs { - s.updatePoints += len(f.Blocks) * updatePointsPerBlock - } - if s.updatePoints > updatePointsThreshold { - conn, err := s.sql.Conn(context.Background()) - if err != nil { - l.Debugln("conn:", err) - return - } - defer conn.Close() - if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 67108864`); err != nil { - l.Debugln("PRAGMA journal_size_limit:", err) - } - row := conn.QueryRowContext(context.Background(), `PRAGMA wal_checkpoint(RESTART)`) - var res, modified, moved int - if row.Err() != nil { - l.Debugln("PRAGMA wal_checkpoint(RESTART):", err) - } else if err := row.Scan(&res, &modified, &moved); err != nil { - l.Debugln("PRAGMA wal_checkpoint(RESTART) (scan):", err) - } else { - l.Debugln("checkpoint at", s.updatePoints, "returned", res, modified, moved) - } - s.updatePoints = 0 - } -} diff --git a/internal/db/sqlite/db_counts.go b/internal/db/sqlite/folderdb_counts.go similarity index 60% rename from internal/db/sqlite/db_counts.go rename to internal/db/sqlite/folderdb_counts.go index 24700dd3f..0e019f15a 100644 --- a/internal/db/sqlite/db_counts.go +++ b/internal/db/sqlite/folderdb_counts.go @@ -19,95 +19,89 @@ type countsRow struct { LocalFlags int64 `db:"local_flags"` } -func (s *DB) CountLocal(folder string, device protocol.DeviceID) (db.Counts, error) { +func (s *folderDB) CountLocal(device protocol.DeviceID) (db.Counts, error) { var res []countsRow if err := s.stmt(` SELECT s.type, s.count, s.size, s.local_flags, s.deleted FROM counts s - INNER JOIN folders o ON o.idx = s.folder_idx INNER JOIN devices d ON d.idx = s.device_idx - WHERE o.folder_id = ? AND d.device_id = ? AND s.local_flags & {{.FlagLocalIgnored}} = 0 - `).Select(&res, folder, device.String()); err != nil { + WHERE d.device_id = ? AND s.local_flags & {{.FlagLocalIgnored}} = 0 + `).Select(&res, device.String()); err != nil { return db.Counts{}, wrap(err) } return summarizeCounts(res), nil } -func (s *DB) CountNeed(folder string, device protocol.DeviceID) (db.Counts, error) { +func (s *folderDB) CountNeed(device protocol.DeviceID) (db.Counts, error) { if device == protocol.LocalDeviceID { - return s.needSizeLocal(folder) + return s.needSizeLocal() } - return s.needSizeRemote(folder, device) + return s.needSizeRemote(device) } -func (s *DB) CountGlobal(folder string) (db.Counts, error) { +func (s *folderDB) CountGlobal() (db.Counts, error) { // Exclude ignored and receive-only changed files from the global count // (legacy expectation? it's a bit weird since those files can in fact // be global and you can get them with GetGlobal etc.) var res []countsRow err := s.stmt(` SELECT s.type, s.count, s.size, s.local_flags, s.deleted FROM counts s - INNER JOIN folders o ON o.idx = s.folder_idx - WHERE o.folder_id = ? AND s.local_flags & {{.FlagLocalGlobal}} != 0 AND s.local_flags & {{or .FlagLocalReceiveOnly .FlagLocalIgnored}} = 0 - `).Select(&res, folder) + WHERE s.local_flags & {{.FlagLocalGlobal}} != 0 AND s.local_flags & {{or .FlagLocalReceiveOnly .FlagLocalIgnored}} = 0 + `).Select(&res) if err != nil { return db.Counts{}, wrap(err) } return summarizeCounts(res), nil } -func (s *DB) CountReceiveOnlyChanged(folder string) (db.Counts, error) { +func (s *folderDB) CountReceiveOnlyChanged() (db.Counts, error) { var res []countsRow err := s.stmt(` SELECT s.type, s.count, s.size, s.local_flags, s.deleted FROM counts s - INNER JOIN folders o ON o.idx = s.folder_idx - WHERE o.folder_id = ? AND local_flags & {{.FlagLocalReceiveOnly}} != 0 - `).Select(&res, folder) + WHERE local_flags & {{.FlagLocalReceiveOnly}} != 0 + `).Select(&res) if err != nil { return db.Counts{}, wrap(err) } return summarizeCounts(res), nil } -func (s *DB) needSizeLocal(folder string) (db.Counts, error) { +func (s *folderDB) needSizeLocal() (db.Counts, error) { // The need size for the local device is the sum of entries with the // need bit set. var res []countsRow err := s.stmt(` SELECT s.type, s.count, s.size, s.local_flags, s.deleted FROM counts s - INNER JOIN folders o ON o.idx = s.folder_idx - WHERE o.folder_id = ? AND s.local_flags & {{.FlagLocalNeeded}} != 0 - `).Select(&res, folder) + WHERE s.local_flags & {{.FlagLocalNeeded}} != 0 + `).Select(&res) if err != nil { return db.Counts{}, wrap(err) } return summarizeCounts(res), nil } -func (s *DB) needSizeRemote(folder string, device protocol.DeviceID) (db.Counts, error) { +func (s *folderDB) needSizeRemote(device protocol.DeviceID) (db.Counts, error) { var res []countsRow // See neededGlobalFilesRemote for commentary as that is the same query without summing if err := s.stmt(` SELECT g.type, count(*) as count, sum(g.size) as size, g.local_flags, g.deleted FROM files g - INNER JOIN folders o ON o.idx = g.folder_idx - WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND NOT g.deleted AND NOT g.invalid AND NOT EXISTS ( + WHERE g.local_flags & {{.FlagLocalGlobal}} != 0 AND NOT g.deleted AND NOT g.invalid AND NOT EXISTS ( SELECT 1 FROM FILES f INNER JOIN devices d ON d.idx = f.device_idx - WHERE f.name = g.name AND f.version = g.version AND f.folder_idx = g.folder_idx AND d.device_id = ? + WHERE f.name = g.name AND f.version = g.version AND d.device_id = ? ) GROUP BY g.type, g.local_flags, g.deleted UNION ALL SELECT g.type, count(*) as count, sum(g.size) as size, g.local_flags, g.deleted FROM files g - INNER JOIN folders o ON o.idx = g.folder_idx - WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND g.deleted AND NOT g.invalid AND EXISTS ( + WHERE g.local_flags & {{.FlagLocalGlobal}} != 0 AND g.deleted AND NOT g.invalid AND EXISTS ( SELECT 1 FROM FILES f INNER JOIN devices d ON d.idx = f.device_idx - WHERE f.name = g.name AND f.folder_idx = g.folder_idx AND d.device_id = ? AND NOT f.deleted + WHERE f.name = g.name AND d.device_id = ? AND NOT f.deleted ) GROUP BY g.type, g.local_flags, g.deleted - `).Select(&res, folder, device.String(), - folder, device.String()); err != nil { + `).Select(&res, device.String(), + device.String()); err != nil { return db.Counts{}, wrap(err) } diff --git a/internal/db/sqlite/db_global.go b/internal/db/sqlite/folderdb_global.go similarity index 63% rename from internal/db/sqlite/db_global.go rename to internal/db/sqlite/folderdb_global.go index 2ce7a0c93..3fc71d53c 100644 --- a/internal/db/sqlite/db_global.go +++ b/internal/db/sqlite/folderdb_global.go @@ -19,7 +19,7 @@ import ( "github.com/syncthing/syncthing/lib/protocol" ) -func (s *DB) GetGlobalFile(folder string, file string) (protocol.FileInfo, bool, error) { +func (s *folderDB) GetGlobalFile(file string) (protocol.FileInfo, bool, error) { file = osutil.NormalizedFilename(file) var ind indirectFI @@ -27,9 +27,8 @@ func (s *DB) GetGlobalFile(folder string, file string) (protocol.FileInfo, bool, SELECT fi.fiprotobuf, bl.blprotobuf FROM fileinfos fi INNER JOIN files f on fi.sequence = f.sequence LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash - INNER JOIN folders o ON o.idx = f.folder_idx - WHERE o.folder_id = ? AND f.name = ? AND f.local_flags & {{.FlagLocalGlobal}} != 0 - `).Get(&ind, folder, file) + WHERE f.name = ? AND f.local_flags & {{.FlagLocalGlobal}} != 0 + `).Get(&ind, file) if errors.Is(err, sql.ErrNoRows) { return protocol.FileInfo{}, false, nil } @@ -43,18 +42,17 @@ func (s *DB) GetGlobalFile(folder string, file string) (protocol.FileInfo, bool, return fi, true, nil } -func (s *DB) GetGlobalAvailability(folder, file string) ([]protocol.DeviceID, error) { +func (s *folderDB) GetGlobalAvailability(file string) ([]protocol.DeviceID, error) { file = osutil.NormalizedFilename(file) var devStrs []string err := s.stmt(` SELECT d.device_id FROM files f INNER JOIN devices d ON d.idx = f.device_idx - INNER JOIN folders o ON o.idx = f.folder_idx - INNER JOIN files g ON f.folder_idx = g.folder_idx AND g.version = f.version AND g.name = f.name - WHERE o.folder_id = ? AND g.name = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND f.device_idx != {{.LocalDeviceIdx}} + INNER JOIN files g ON g.version = f.version AND g.name = f.name + WHERE g.name = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND f.device_idx != {{.LocalDeviceIdx}} ORDER BY d.device_id - `).Select(&devStrs, folder, file) + `).Select(&devStrs, file) if errors.Is(err, sql.ErrNoRows) { return nil, nil } @@ -74,22 +72,21 @@ func (s *DB) GetGlobalAvailability(folder, file string) ([]protocol.DeviceID, er return devs, nil } -func (s *DB) AllGlobalFiles(folder string) (iter.Seq[db.FileMetadata], func() error) { +func (s *folderDB) AllGlobalFiles() (iter.Seq[db.FileMetadata], func() error) { it, errFn := iterStructs[db.FileMetadata](s.stmt(` SELECT f.sequence, f.name, f.type, f.modified as modnanos, f.size, f.deleted, f.invalid, f.local_flags as localflags FROM files f - INNER JOIN folders o ON o.idx = f.folder_idx - WHERE o.folder_id = ? AND f.local_flags & {{.FlagLocalGlobal}} != 0 + WHERE f.local_flags & {{.FlagLocalGlobal}} != 0 ORDER BY f.name - `).Queryx(folder)) + `).Queryx()) return itererr.Map(it, errFn, func(m db.FileMetadata) (db.FileMetadata, error) { m.Name = osutil.NativeFilename(m.Name) return m, nil }) } -func (s *DB) AllGlobalFilesPrefix(folder string, prefix string) (iter.Seq[db.FileMetadata], func() error) { +func (s *folderDB) AllGlobalFilesPrefix(prefix string) (iter.Seq[db.FileMetadata], func() error) { if prefix == "" { - return s.AllGlobalFiles(folder) + return s.AllGlobalFiles() } prefix = osutil.NormalizedFilename(prefix) @@ -97,17 +94,16 @@ func (s *DB) AllGlobalFilesPrefix(folder string, prefix string) (iter.Seq[db.Fil it, errFn := iterStructs[db.FileMetadata](s.stmt(` SELECT f.sequence, f.name, f.type, f.modified as modnanos, f.size, f.deleted, f.invalid, f.local_flags as localflags FROM files f - INNER JOIN folders o ON o.idx = f.folder_idx - WHERE o.folder_id = ? AND f.name >= ? AND f.name < ? AND f.local_flags & {{.FlagLocalGlobal}} != 0 + WHERE f.name >= ? AND f.name < ? AND f.local_flags & {{.FlagLocalGlobal}} != 0 ORDER BY f.name - `).Queryx(folder, prefix, end)) + `).Queryx(prefix, end)) return itererr.Map(it, errFn, func(m db.FileMetadata) (db.FileMetadata, error) { m.Name = osutil.NativeFilename(m.Name) return m, nil }) } -func (s *DB) AllNeededGlobalFiles(folder string, device protocol.DeviceID, order config.PullOrder, limit, offset int) (iter.Seq[protocol.FileInfo], func() error) { +func (s *folderDB) AllNeededGlobalFiles(device protocol.DeviceID, order config.PullOrder, limit, offset int) (iter.Seq[protocol.FileInfo], func() error) { var selectOpts string switch order { case config.PullOrderRandom: @@ -132,25 +128,24 @@ func (s *DB) AllNeededGlobalFiles(folder string, device protocol.DeviceID, order } if device == protocol.LocalDeviceID { - return s.neededGlobalFilesLocal(folder, selectOpts) + return s.neededGlobalFilesLocal(selectOpts) } - return s.neededGlobalFilesRemote(folder, device, selectOpts) + return s.neededGlobalFilesRemote(device, selectOpts) } -func (s *DB) neededGlobalFilesLocal(folder, selectOpts string) (iter.Seq[protocol.FileInfo], func() error) { +func (s *folderDB) neededGlobalFilesLocal(selectOpts string) (iter.Seq[protocol.FileInfo], func() error) { // Select all the non-ignored files with the need bit set. it, errFn := iterStructs[indirectFI](s.stmt(` SELECT fi.fiprotobuf, bl.blprotobuf, g.name, g.size, g.modified FROM fileinfos fi INNER JOIN files g on fi.sequence = g.sequence LEFT JOIN blocklists bl ON bl.blocklist_hash = g.blocklist_hash - INNER JOIN folders o ON o.idx = g.folder_idx - WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalIgnored}} = 0 AND g.local_flags & {{.FlagLocalNeeded}} != 0 - ` + selectOpts).Queryx(folder)) + WHERE g.local_flags & {{.FlagLocalIgnored}} = 0 AND g.local_flags & {{.FlagLocalNeeded}} != 0 + ` + selectOpts).Queryx()) return itererr.Map(it, errFn, indirectFI.FileInfo) } -func (s *DB) neededGlobalFilesRemote(folder string, device protocol.DeviceID, selectOpts string) (iter.Seq[protocol.FileInfo], func() error) { +func (s *folderDB) neededGlobalFilesRemote(device protocol.DeviceID, selectOpts string) (iter.Seq[protocol.FileInfo], func() error) { // Select: // // - all the valid, non-deleted global files that don't have a corresponding @@ -163,11 +158,10 @@ func (s *DB) neededGlobalFilesRemote(folder string, device protocol.DeviceID, se SELECT fi.fiprotobuf, bl.blprotobuf, g.name, g.size, g.modified FROM fileinfos fi INNER JOIN files g on fi.sequence = g.sequence LEFT JOIN blocklists bl ON bl.blocklist_hash = g.blocklist_hash - INNER JOIN folders o ON o.idx = g.folder_idx - WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND NOT g.deleted AND NOT g.invalid AND NOT EXISTS ( + WHERE g.local_flags & {{.FlagLocalGlobal}} != 0 AND NOT g.deleted AND NOT g.invalid AND NOT EXISTS ( SELECT 1 FROM FILES f INNER JOIN devices d ON d.idx = f.device_idx - WHERE f.name = g.name AND f.version = g.version AND f.folder_idx = g.folder_idx AND d.device_id = ? + WHERE f.name = g.name AND f.version = g.version AND d.device_id = ? ) UNION ALL @@ -175,15 +169,14 @@ func (s *DB) neededGlobalFilesRemote(folder string, device protocol.DeviceID, se SELECT fi.fiprotobuf, bl.blprotobuf, g.name, g.size, g.modified FROM fileinfos fi INNER JOIN files g on fi.sequence = g.sequence LEFT JOIN blocklists bl ON bl.blocklist_hash = g.blocklist_hash - INNER JOIN folders o ON o.idx = g.folder_idx - WHERE o.folder_id = ? AND g.local_flags & {{.FlagLocalGlobal}} != 0 AND g.deleted AND NOT g.invalid AND EXISTS ( + WHERE g.local_flags & {{.FlagLocalGlobal}} != 0 AND g.deleted AND NOT g.invalid AND EXISTS ( SELECT 1 FROM FILES f INNER JOIN devices d ON d.idx = f.device_idx - WHERE f.name = g.name AND f.folder_idx = g.folder_idx AND d.device_id = ? AND NOT f.deleted + WHERE f.name = g.name AND d.device_id = ? AND NOT f.deleted ) `+selectOpts).Queryx( - folder, device.String(), - folder, device.String(), + device.String(), + device.String(), )) return itererr.Map(it, errFn, indirectFI.FileInfo) } diff --git a/internal/db/sqlite/db_indexid.go b/internal/db/sqlite/folderdb_indexid.go similarity index 64% rename from internal/db/sqlite/db_indexid.go rename to internal/db/sqlite/folderdb_indexid.go index e8f3fa1ab..1a4f693f5 100644 --- a/internal/db/sqlite/db_indexid.go +++ b/internal/db/sqlite/folderdb_indexid.go @@ -16,16 +16,15 @@ import ( "github.com/syncthing/syncthing/lib/protocol" ) -func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.IndexID, error) { +func (s *folderDB) GetIndexID(device protocol.DeviceID) (protocol.IndexID, error) { // Try a fast read-only query to begin with. If it does not find the ID // we'll do the full thing under a lock. var indexID string if err := s.stmt(` SELECT i.index_id FROM indexids i - INNER JOIN folders o ON o.idx = i.folder_idx INNER JOIN devices d ON d.idx = i.device_idx - WHERE o.folder_id = ? AND d.device_id = ? - `).Get(&indexID, folder, device.String()); err == nil && indexID != "" { + WHERE d.device_id = ? + `).Get(&indexID, device.String()); err == nil && indexID != "" { idx, err := indexIDFromHex(indexID) return idx, wrap(err, "select") } @@ -40,14 +39,9 @@ func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.Index // We are now operating only for the local device ID - folderIdx, err := s.folderIdxLocked(folder) - if err != nil { - return 0, wrap(err) - } - if err := s.stmt(` - SELECT index_id FROM indexids WHERE folder_idx = ? AND device_idx = {{.LocalDeviceIdx}} - `).Get(&indexID, folderIdx); err != nil && !errors.Is(err, sql.ErrNoRows) { + SELECT index_id FROM indexids WHERE device_idx = {{.LocalDeviceIdx}} + `).Get(&indexID); err != nil && !errors.Is(err, sql.ErrNoRows) { return 0, wrap(err, "select local") } @@ -57,11 +51,11 @@ func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.Index // any. id := protocol.NewIndexID() if _, err := s.stmt(` - INSERT INTO indexids (folder_idx, device_idx, index_id, sequence) - SELECT ?, {{.LocalDeviceIdx}}, ?, COALESCE(MAX(sequence), 0) FROM files - WHERE folder_idx = ? AND device_idx = {{.LocalDeviceIdx}} + INSERT INTO indexids (device_idx, index_id, sequence) + SELECT {{.LocalDeviceIdx}}, ?, COALESCE(MAX(sequence), 0) FROM files + WHERE device_idx = {{.LocalDeviceIdx}} ON CONFLICT DO UPDATE SET index_id = ? - `).Exec(folderIdx, indexIDToHex(id), folderIdx, indexIDToHex(id)); err != nil { + `).Exec(indexIDToHex(id), indexIDToHex(id)); err != nil { return 0, wrap(err, "insert") } return id, nil @@ -70,42 +64,37 @@ func (s *DB) GetIndexID(folder string, device protocol.DeviceID) (protocol.Index return indexIDFromHex(indexID) } -func (s *DB) SetIndexID(folder string, device protocol.DeviceID, id protocol.IndexID) error { +func (s *folderDB) SetIndexID(device protocol.DeviceID, id protocol.IndexID) error { s.updateLock.Lock() defer s.updateLock.Unlock() - folderIdx, err := s.folderIdxLocked(folder) - if err != nil { - return wrap(err, "folder idx") - } deviceIdx, err := s.deviceIdxLocked(device) if err != nil { return wrap(err, "device idx") } if _, err := s.stmt(` - INSERT OR REPLACE INTO indexids (folder_idx, device_idx, index_id, sequence) values (?, ?, ?, 0) - `).Exec(folderIdx, deviceIdx, indexIDToHex(id)); err != nil { + INSERT OR REPLACE INTO indexids (device_idx, index_id, sequence) values (?, ?, 0) + `).Exec(deviceIdx, indexIDToHex(id)); err != nil { return wrap(err, "insert") } return nil } -func (s *DB) DropAllIndexIDs() error { +func (s *folderDB) DropAllIndexIDs() error { s.updateLock.Lock() defer s.updateLock.Unlock() _, err := s.stmt(`DELETE FROM indexids`).Exec() return wrap(err) } -func (s *DB) GetDeviceSequence(folder string, device protocol.DeviceID) (int64, error) { +func (s *folderDB) GetDeviceSequence(device protocol.DeviceID) (int64, error) { var res sql.NullInt64 err := s.stmt(` SELECT sequence FROM indexids i - INNER JOIN folders o ON o.idx = i.folder_idx INNER JOIN devices d ON d.idx = i.device_idx - WHERE o.folder_id = ? AND d.device_id = ? - `).Get(&res, folder, device.String()) + WHERE d.device_id = ? + `).Get(&res, device.String()) if errors.Is(err, sql.ErrNoRows) { return 0, nil } @@ -118,7 +107,7 @@ func (s *DB) GetDeviceSequence(folder string, device protocol.DeviceID) (int64, return res.Int64, nil } -func (s *DB) RemoteSequences(folder string) (map[protocol.DeviceID]int64, error) { +func (s *folderDB) RemoteSequences() (map[protocol.DeviceID]int64, error) { type row struct { Device string Seq int64 @@ -126,10 +115,9 @@ func (s *DB) RemoteSequences(folder string) (map[protocol.DeviceID]int64, error) it, errFn := iterStructs[row](s.stmt(` SELECT d.device_id AS device, i.sequence AS seq FROM indexids i - INNER JOIN folders o ON o.idx = i.folder_idx INNER JOIN devices d ON d.idx = i.device_idx - WHERE o.folder_id = ? AND i.device_idx != {{.LocalDeviceIdx}} - `).Queryx(folder)) + WHERE i.device_idx != {{.LocalDeviceIdx}} + `).Queryx()) res := make(map[protocol.DeviceID]int64) for row, err := range itererr.Zip(it, errFn) { diff --git a/internal/db/sqlite/db_local.go b/internal/db/sqlite/folderdb_local.go similarity index 59% rename from internal/db/sqlite/db_local.go rename to internal/db/sqlite/folderdb_local.go index 4b4064757..58d35fb05 100644 --- a/internal/db/sqlite/db_local.go +++ b/internal/db/sqlite/folderdb_local.go @@ -18,7 +18,7 @@ import ( "github.com/syncthing/syncthing/lib/protocol" ) -func (s *DB) GetDeviceFile(folder string, device protocol.DeviceID, file string) (protocol.FileInfo, bool, error) { +func (s *folderDB) GetDeviceFile(device protocol.DeviceID, file string) (protocol.FileInfo, bool, error) { file = osutil.NormalizedFilename(file) var ind indirectFI @@ -27,9 +27,8 @@ func (s *DB) GetDeviceFile(folder string, device protocol.DeviceID, file string) INNER JOIN files f on fi.sequence = f.sequence LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash INNER JOIN devices d ON f.device_idx = d.idx - INNER JOIN folders o ON f.folder_idx = o.idx - WHERE o.folder_id = ? AND d.device_id = ? AND f.name = ? - `).Get(&ind, folder, device.String(), file) + WHERE d.device_id = ? AND f.name = ? + `).Get(&ind, device.String(), file) if errors.Is(err, sql.ErrNoRows) { return protocol.FileInfo{}, false, nil } @@ -43,19 +42,18 @@ func (s *DB) GetDeviceFile(folder string, device protocol.DeviceID, file string) return fi, true, nil } -func (s *DB) AllLocalFiles(folder string, device protocol.DeviceID) (iter.Seq[protocol.FileInfo], func() error) { +func (s *folderDB) AllLocalFiles(device protocol.DeviceID) (iter.Seq[protocol.FileInfo], func() error) { it, errFn := iterStructs[indirectFI](s.stmt(` SELECT fi.fiprotobuf, bl.blprotobuf FROM fileinfos fi INNER JOIN files f on fi.sequence = f.sequence LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash - INNER JOIN folders o ON o.idx = f.folder_idx INNER JOIN devices d ON d.idx = f.device_idx - WHERE o.folder_id = ? AND d.device_id = ? - `).Queryx(folder, device.String())) + WHERE d.device_id = ? + `).Queryx(device.String())) return itererr.Map(it, errFn, indirectFI.FileInfo) } -func (s *DB) AllLocalFilesBySequence(folder string, device protocol.DeviceID, startSeq int64, limit int) (iter.Seq[protocol.FileInfo], func() error) { +func (s *folderDB) AllLocalFilesBySequence(device protocol.DeviceID, startSeq int64, limit int) (iter.Seq[protocol.FileInfo], func() error) { var limitStr string if limit > 0 { limitStr = fmt.Sprintf(" LIMIT %d", limit) @@ -64,17 +62,16 @@ func (s *DB) AllLocalFilesBySequence(folder string, device protocol.DeviceID, st SELECT fi.fiprotobuf, bl.blprotobuf FROM fileinfos fi INNER JOIN files f on fi.sequence = f.sequence LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash - INNER JOIN folders o ON o.idx = f.folder_idx INNER JOIN devices d ON d.idx = f.device_idx - WHERE o.folder_id = ? AND d.device_id = ? AND f.sequence >= ? + WHERE d.device_id = ? AND f.sequence >= ? ORDER BY f.sequence`+limitStr).Queryx( - folder, device.String(), startSeq)) + device.String(), startSeq)) return itererr.Map(it, errFn, indirectFI.FileInfo) } -func (s *DB) AllLocalFilesWithPrefix(folder string, device protocol.DeviceID, prefix string) (iter.Seq[protocol.FileInfo], func() error) { +func (s *folderDB) AllLocalFilesWithPrefix(device protocol.DeviceID, prefix string) (iter.Seq[protocol.FileInfo], func() error) { if prefix == "" { - return s.AllLocalFiles(folder, device) + return s.AllLocalFiles(device) } prefix = osutil.NormalizedFilename(prefix) @@ -84,37 +81,20 @@ func (s *DB) AllLocalFilesWithPrefix(folder string, device protocol.DeviceID, pr SELECT fi.fiprotobuf, bl.blprotobuf FROM fileinfos fi INNER JOIN files f on fi.sequence = f.sequence LEFT JOIN blocklists bl ON bl.blocklist_hash = f.blocklist_hash - INNER JOIN folders o ON o.idx = f.folder_idx INNER JOIN devices d ON d.idx = f.device_idx - WHERE o.folder_id = ? AND d.device_id = ? AND f.name >= ? AND f.name < ? - `, folder, device.String(), prefix, end)) + WHERE d.device_id = ? AND f.name >= ? AND f.name < ? + `, device.String(), prefix, end)) return itererr.Map(it, errFn, indirectFI.FileInfo) } -func (s *DB) AllLocalFilesWithBlocksHash(folder string, h []byte) (iter.Seq[db.FileMetadata], func() error) { +func (s *folderDB) AllLocalFilesWithBlocksHash(h []byte) (iter.Seq[db.FileMetadata], func() error) { return iterStructs[db.FileMetadata](s.stmt(` SELECT f.sequence, f.name, f.type, f.modified as modnanos, f.size, f.deleted, f.invalid, f.local_flags as localflags FROM files f - INNER JOIN folders o ON o.idx = f.folder_idx - WHERE o.folder_id = ? AND f.device_idx = {{.LocalDeviceIdx}} AND f.blocklist_hash = ? - `).Queryx(folder, h)) -} - -func (s *DB) AllLocalFilesWithBlocksHashAnyFolder(h []byte) (iter.Seq2[string, db.FileMetadata], func() error) { - type row struct { - FolderID string `db:"folder_id"` - db.FileMetadata - } - it, errFn := iterStructs[row](s.stmt(` - SELECT o.folder_id, f.sequence, f.name, f.type, f.modified as modnanos, f.size, f.deleted, f.invalid, f.local_flags as localflags FROM files f - INNER JOIN folders o ON o.idx = f.folder_idx WHERE f.device_idx = {{.LocalDeviceIdx}} AND f.blocklist_hash = ? `).Queryx(h)) - return itererr.Map2(it, errFn, func(r row) (string, db.FileMetadata, error) { - return r.FolderID, r.FileMetadata, nil - }) } -func (s *DB) AllLocalBlocksWithHash(hash []byte) (iter.Seq[db.BlockMapEntry], func() error) { +func (s *folderDB) AllLocalBlocksWithHash(hash []byte) (iter.Seq[db.BlockMapEntry], func() error) { // We involve the files table in this select because deletion of blocks // & blocklists is deferred (garbage collected) while the files list is // not. This filters out blocks that are in fact deleted. @@ -124,3 +104,25 @@ func (s *DB) AllLocalBlocksWithHash(hash []byte) (iter.Seq[db.BlockMapEntry], fu WHERE f.device_idx = {{.LocalDeviceIdx}} AND b.hash = ? `).Queryx(hash)) } + +func (s *folderDB) ListDevicesForFolder() ([]protocol.DeviceID, error) { + var res []string + err := s.stmt(` + SELECT DISTINCT d.device_id FROM counts s + INNER JOIN devices d ON d.idx = s.device_idx + WHERE s.count > 0 AND s.device_idx != {{.LocalDeviceIdx}} + ORDER BY d.device_id + `).Select(&res) + if err != nil { + return nil, wrap(err) + } + + devs := make([]protocol.DeviceID, len(res)) + for i, s := range res { + devs[i], err = protocol.DeviceIDFromString(s) + if err != nil { + return nil, wrap(err) + } + } + return devs, nil +} diff --git a/internal/db/sqlite/folderdb_mtimes.go b/internal/db/sqlite/folderdb_mtimes.go new file mode 100644 index 000000000..051c933ad --- /dev/null +++ b/internal/db/sqlite/folderdb_mtimes.go @@ -0,0 +1,45 @@ +// Copyright (C) 2025 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package sqlite + +import ( + "time" +) + +func (s *folderDB) GetMtime(name string) (ondisk, virtual time.Time) { + var res struct { + Ondisk int64 + Virtual int64 + } + if err := s.stmt(` + SELECT m.ondisk, m.virtual FROM mtimes m + WHERE m.name = ? + `).Get(&res, name); err != nil { + return time.Time{}, time.Time{} + } + return time.Unix(0, res.Ondisk), time.Unix(0, res.Virtual) +} + +func (s *folderDB) PutMtime(name string, ondisk, virtual time.Time) error { + s.updateLock.Lock() + defer s.updateLock.Unlock() + _, err := s.stmt(` + INSERT OR REPLACE INTO mtimes (name, ondisk, virtual) + VALUES (?, ?, ?) + `).Exec(name, ondisk.UnixNano(), virtual.UnixNano()) + return wrap(err) +} + +func (s *folderDB) DeleteMtime(name string) error { + s.updateLock.Lock() + defer s.updateLock.Unlock() + _, err := s.stmt(` + DELETE FROM mtimes + WHERE name = ? + `).Exec(name) + return wrap(err) +} diff --git a/internal/db/sqlite/folderdb_open.go b/internal/db/sqlite/folderdb_open.go new file mode 100644 index 000000000..f0b96bd28 --- /dev/null +++ b/internal/db/sqlite/folderdb_open.go @@ -0,0 +1,110 @@ +// Copyright (C) 2025 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package sqlite + +import ( + "time" + + "github.com/syncthing/syncthing/lib/protocol" +) + +type folderDB struct { + folderID string + *baseDB + + localDeviceIdx int64 + deleteRetention time.Duration +} + +func openFolderDB(folder, path string, deleteRetention time.Duration) (*folderDB, error) { + pragmas := []string{ + "journal_mode = WAL", + "optimize = 0x10002", + "auto_vacuum = INCREMENTAL", + "default_temp_store = MEMORY", + "temp_store = MEMORY", + } + schemas := []string{ + "sql/schema/common/*", + "sql/schema/folder/*", + } + + base, err := openBase(path, maxDBConns, pragmas, schemas, nil) + if err != nil { + return nil, err + } + + fdb := &folderDB{ + folderID: folder, + baseDB: base, + deleteRetention: deleteRetention, + } + + _ = fdb.PutKV("folderID", []byte(folder)) + + // Touch device IDs that should always exist and have a low index + // numbers, and will never change + fdb.localDeviceIdx, _ = fdb.deviceIdxLocked(protocol.LocalDeviceID) + fdb.tplInput["LocalDeviceIdx"] = fdb.localDeviceIdx + + return fdb, nil +} + +// Open the database with options suitable for the migration inserts. This +// is not a safe mode of operation for normal processing, use only for bulk +// inserts with a close afterwards. +func openFolderDBForMigration(folder, path string, deleteRetention time.Duration) (*folderDB, error) { + pragmas := []string{ + "journal_mode = OFF", + "default_temp_store = MEMORY", + "temp_store = MEMORY", + "foreign_keys = 0", + "synchronous = 0", + "locking_mode = EXCLUSIVE", + } + schemas := []string{ + "sql/schema/common/*", + "sql/schema/folder/*", + } + + base, err := openBase(path, 1, pragmas, schemas, nil) + if err != nil { + return nil, err + } + + fdb := &folderDB{ + folderID: folder, + baseDB: base, + deleteRetention: deleteRetention, + } + + // Touch device IDs that should always exist and have a low index + // numbers, and will never change + fdb.localDeviceIdx, _ = fdb.deviceIdxLocked(protocol.LocalDeviceID) + fdb.tplInput["LocalDeviceIdx"] = fdb.localDeviceIdx + + return fdb, nil +} + +func (s *folderDB) deviceIdxLocked(deviceID protocol.DeviceID) (int64, error) { + devStr := deviceID.String() + if _, err := s.stmt(` + INSERT OR IGNORE INTO devices(device_id) + VALUES (?) + `).Exec(devStr); err != nil { + return 0, wrap(err) + } + var idx int64 + if err := s.stmt(` + SELECT idx FROM devices + WHERE device_id = ? + `).Get(&idx, devStr); err != nil { + return 0, wrap(err) + } + + return idx, nil +} diff --git a/internal/db/sqlite/folderdb_update.go b/internal/db/sqlite/folderdb_update.go new file mode 100644 index 000000000..8376b3b92 --- /dev/null +++ b/internal/db/sqlite/folderdb_update.go @@ -0,0 +1,531 @@ +// Copyright (C) 2025 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package sqlite + +import ( + "cmp" + "context" + "fmt" + "slices" + + "github.com/jmoiron/sqlx" + "github.com/syncthing/syncthing/internal/gen/dbproto" + "github.com/syncthing/syncthing/internal/itererr" + "github.com/syncthing/syncthing/lib/osutil" + "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/sliceutil" + "google.golang.org/protobuf/proto" +) + +const ( + // Arbitrarily chosen values for checkpoint frequency.... + updatePointsPerFile = 100 + updatePointsPerBlock = 1 + updatePointsThreshold = 250_000 +) + +func (s *folderDB) Update(device protocol.DeviceID, fs []protocol.FileInfo) error { + s.updateLock.Lock() + defer s.updateLock.Unlock() + + deviceIdx, err := s.deviceIdxLocked(device) + if err != nil { + return wrap(err) + } + + tx, err := s.sql.BeginTxx(context.Background(), nil) + if err != nil { + return wrap(err) + } + defer tx.Rollback() //nolint:errcheck + txp := &txPreparedStmts{Tx: tx} + + //nolint:sqlclosecheck + insertFileStmt, err := txp.Preparex(` + INSERT OR REPLACE INTO files (device_idx, remote_sequence, name, type, modified, size, version, deleted, invalid, local_flags, blocklist_hash) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + RETURNING sequence + `) + if err != nil { + return wrap(err, "prepare insert file") + } + + //nolint:sqlclosecheck + insertFileInfoStmt, err := txp.Preparex(` + INSERT INTO fileinfos (sequence, fiprotobuf) + VALUES (?, ?) + `) + if err != nil { + return wrap(err, "prepare insert fileinfo") + } + + //nolint:sqlclosecheck + insertBlockListStmt, err := txp.Preparex(` + INSERT OR IGNORE INTO blocklists (blocklist_hash, blprotobuf) + VALUES (?, ?) + `) + if err != nil { + return wrap(err, "prepare insert blocklist") + } + + var prevRemoteSeq int64 + for i, f := range fs { + f.Name = osutil.NormalizedFilename(f.Name) + + var blockshash *[]byte + if len(f.Blocks) > 0 { + f.BlocksHash = protocol.BlocksHash(f.Blocks) + blockshash = &f.BlocksHash + } else { + f.BlocksHash = nil + } + + if f.Type == protocol.FileInfoTypeDirectory { + f.Size = 128 // synthetic directory size + } + + // Insert the file. + // + // If it is a remote file, set remote_sequence otherwise leave it at + // null. Returns the new local sequence. + var remoteSeq *int64 + if device != protocol.LocalDeviceID { + if i > 0 && f.Sequence == prevRemoteSeq { + return fmt.Errorf("duplicate remote sequence number %d", prevRemoteSeq) + } + prevRemoteSeq = f.Sequence + remoteSeq = &f.Sequence + } + var localSeq int64 + if err := insertFileStmt.Get(&localSeq, deviceIdx, remoteSeq, f.Name, f.Type, f.ModTime().UnixNano(), f.Size, f.Version.String(), f.IsDeleted(), f.IsInvalid(), f.LocalFlags, blockshash); err != nil { + return wrap(err, "insert file") + } + + if len(f.Blocks) > 0 { + // Indirect the block list + blocks := sliceutil.Map(f.Blocks, protocol.BlockInfo.ToWire) + bs, err := proto.Marshal(&dbproto.BlockList{Blocks: blocks}) + if err != nil { + return wrap(err, "marshal blocklist") + } + if _, err := insertBlockListStmt.Exec(f.BlocksHash, bs); err != nil { + return wrap(err, "insert blocklist") + } + + if device == protocol.LocalDeviceID { + // Insert all blocks + if err := s.insertBlocksLocked(txp, f.BlocksHash, f.Blocks); err != nil { + return wrap(err, "insert blocks") + } + } + + f.Blocks = nil + } + + // Insert the fileinfo + if device == protocol.LocalDeviceID { + f.Sequence = localSeq + } + bs, err := proto.Marshal(f.ToWire(true)) + if err != nil { + return wrap(err, "marshal fileinfo") + } + if _, err := insertFileInfoStmt.Exec(localSeq, bs); err != nil { + return wrap(err, "insert fileinfo") + } + + // Update global and need + if err := s.recalcGlobalForFile(txp, f.Name); err != nil { + return wrap(err) + } + } + + if err := tx.Commit(); err != nil { + return wrap(err) + } + + s.periodicCheckpointLocked(fs) + return nil +} + +func (s *folderDB) DropDevice(device protocol.DeviceID) error { + if device == protocol.LocalDeviceID { + panic("bug: cannot drop local device") + } + + s.updateLock.Lock() + defer s.updateLock.Unlock() + + tx, err := s.sql.BeginTxx(context.Background(), nil) + if err != nil { + return wrap(err) + } + defer tx.Rollback() //nolint:errcheck + txp := &txPreparedStmts{Tx: tx} + + // Drop the device, which cascades to delete all files etc for it + if _, err := tx.Exec(`DELETE FROM devices WHERE device_id = ?`, device.String()); err != nil { + return wrap(err) + } + + // Recalc the globals for all affected folders + if err := s.recalcGlobalForFolder(txp); err != nil { + return wrap(err) + } + + return wrap(tx.Commit()) +} + +func (s *folderDB) DropAllFiles(device protocol.DeviceID) error { + s.updateLock.Lock() + defer s.updateLock.Unlock() + + // This is a two part operation, first dropping all the files and then + // recalculating the global state for the entire folder. + + deviceIdx, err := s.deviceIdxLocked(device) + if err != nil { + return wrap(err) + } + + tx, err := s.sql.BeginTxx(context.Background(), nil) + if err != nil { + return wrap(err) + } + defer tx.Rollback() //nolint:errcheck + txp := &txPreparedStmts{Tx: tx} + + // Drop all the file entries + + result, err := tx.Exec(` + DELETE FROM files + WHERE device_idx = ? + `, deviceIdx) + if err != nil { + return wrap(err) + } + if n, err := result.RowsAffected(); err == nil && n == 0 { + // The delete affected no rows, so we don't need to redo the entire + // global/need calculation. + return wrap(tx.Commit()) + } + + // Recalc global for the entire folder + + if err := s.recalcGlobalForFolder(txp); err != nil { + return wrap(err) + } + return wrap(tx.Commit()) +} + +func (s *folderDB) DropFilesNamed(device protocol.DeviceID, names []string) error { + for i := range names { + names[i] = osutil.NormalizedFilename(names[i]) + } + + s.updateLock.Lock() + defer s.updateLock.Unlock() + + deviceIdx, err := s.deviceIdxLocked(device) + if err != nil { + return wrap(err) + } + + tx, err := s.sql.BeginTxx(context.Background(), nil) + if err != nil { + return wrap(err) + } + defer tx.Rollback() //nolint:errcheck + txp := &txPreparedStmts{Tx: tx} + + // Drop the named files + + query, args, err := sqlx.In(` + DELETE FROM files + WHERE device_idx = ? AND name IN (?) + `, deviceIdx, names) + if err != nil { + return wrap(err) + } + if _, err := tx.Exec(query, args...); err != nil { + return wrap(err) + } + + // Recalc globals for the named files + + for _, name := range names { + if err := s.recalcGlobalForFile(txp, name); err != nil { + return wrap(err) + } + } + + return wrap(tx.Commit()) +} + +func (*folderDB) insertBlocksLocked(tx *txPreparedStmts, blocklistHash []byte, blocks []protocol.BlockInfo) error { + if len(blocks) == 0 { + return nil + } + bs := make([]map[string]any, len(blocks)) + for i, b := range blocks { + bs[i] = map[string]any{ + "hash": b.Hash, + "blocklist_hash": blocklistHash, + "idx": i, + "offset": b.Offset, + "size": b.Size, + } + } + + // Very large block lists (>8000 blocks) result in "too many variables" + // error. Chunk it to a reasonable size. + for chunk := range slices.Chunk(bs, 1000) { + if _, err := tx.NamedExec(` + INSERT OR IGNORE INTO blocks (hash, blocklist_hash, idx, offset, size) + VALUES (:hash, :blocklist_hash, :idx, :offset, :size) + `, chunk); err != nil { + return wrap(err) + } + } + return nil +} + +func (s *folderDB) recalcGlobalForFolder(txp *txPreparedStmts) error { + // Select files where there is no global, those are the ones we need to + // recalculate. + //nolint:sqlclosecheck + namesStmt, err := txp.Preparex(` + SELECT f.name FROM files f + WHERE NOT EXISTS ( + SELECT 1 FROM files g + WHERE g.name = f.name AND g.local_flags & ? != 0 + ) + GROUP BY name + `) + if err != nil { + return wrap(err) + } + rows, err := namesStmt.Queryx(protocol.FlagLocalGlobal) + if err != nil { + return wrap(err) + } + defer rows.Close() + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return wrap(err) + } + if err := s.recalcGlobalForFile(txp, name); err != nil { + return wrap(err) + } + } + return wrap(rows.Err()) +} + +func (s *folderDB) recalcGlobalForFile(txp *txPreparedStmts, file string) error { + //nolint:sqlclosecheck + selStmt, err := txp.Preparex(` + SELECT name, device_idx, sequence, modified, version, deleted, invalid, local_flags FROM files + WHERE name = ? + `) + if err != nil { + return wrap(err) + } + es, err := itererr.Collect(iterStructs[fileRow](selStmt.Queryx(file))) + if err != nil { + return wrap(err) + } + if len(es) == 0 { + // shouldn't happen + return nil + } + + // Sort the entries; the global entry is at the head of the list + slices.SortFunc(es, fileRow.Compare) + + // The global version is the first one in the list that is not invalid, + // or just the first one in the list if all are invalid. + var global fileRow + globIdx := slices.IndexFunc(es, func(e fileRow) bool { return !e.Invalid }) + if globIdx < 0 { + globIdx = 0 + } + global = es[globIdx] + + // We "have" the file if the position in the list of versions is at the + // global version or better, or if the version is the same as the global + // file (we might be further down the list due to invalid flags), or if + // the global is deleted and we don't have it at all... + localIdx := slices.IndexFunc(es, func(e fileRow) bool { return e.DeviceIdx == s.localDeviceIdx }) + hasLocal := localIdx >= 0 && localIdx <= globIdx || // have a better or equal version + localIdx >= 0 && es[localIdx].Version.Equal(global.Version.Vector) || // have an equal version but invalid/ignored + localIdx < 0 && global.Deleted // missing it, but the global is also deleted + + // Set the global flag on the global entry. Set the need flag if the + // local device needs this file, unless it's invalid. + global.LocalFlags |= protocol.FlagLocalGlobal + if hasLocal || global.Invalid { + global.LocalFlags &= ^protocol.FlagLocalNeeded + } else { + global.LocalFlags |= protocol.FlagLocalNeeded + } + //nolint:sqlclosecheck + upStmt, err := txp.Prepare(` + UPDATE files SET local_flags = ? + WHERE device_idx = ? AND sequence = ? + `) + if err != nil { + return wrap(err) + } + if _, err := upStmt.Exec(global.LocalFlags, global.DeviceIdx, global.Sequence); err != nil { + return wrap(err) + } + + // Clear the need and global flags on all other entries + //nolint:sqlclosecheck + upStmt, err = txp.Prepare(` + UPDATE files SET local_flags = local_flags & ? + WHERE name = ? AND sequence != ? AND local_flags & ? != 0 + `) + if err != nil { + return wrap(err) + } + if _, err := upStmt.Exec(^(protocol.FlagLocalNeeded | protocol.FlagLocalGlobal), global.Name, global.Sequence, protocol.FlagLocalNeeded|protocol.FlagLocalGlobal); err != nil { + return wrap(err) + } + + return nil +} + +func (s *DB) folderIdxLocked(folderID string) (int64, error) { + if _, err := s.stmt(` + INSERT OR IGNORE INTO folders(folder_id) + VALUES (?) + `).Exec(folderID); err != nil { + return 0, wrap(err) + } + var idx int64 + if err := s.stmt(` + SELECT idx FROM folders + WHERE folder_id = ? + `).Get(&idx, folderID); err != nil { + return 0, wrap(err) + } + + return idx, nil +} + +type fileRow struct { + Name string + Version dbVector + DeviceIdx int64 `db:"device_idx"` + Sequence int64 + Modified int64 + Size int64 + LocalFlags int64 `db:"local_flags"` + Deleted bool + Invalid bool +} + +func (e fileRow) Compare(other fileRow) int { + // From FileInfo.WinsConflict + vc := e.Version.Vector.Compare(other.Version.Vector) + switch vc { + case protocol.Equal: + if e.Invalid != other.Invalid { + if e.Invalid { + return 1 + } + return -1 + } + + // Compare the device ID index, lower is better. This is only + // deterministic to the extent that LocalDeviceID will always be the + // lowest one, order between remote devices is random (and + // irrelevant). + return cmp.Compare(e.DeviceIdx, other.DeviceIdx) + case protocol.Greater: // we are newer + return -1 + case protocol.Lesser: // we are older + return 1 + case protocol.ConcurrentGreater, protocol.ConcurrentLesser: // there is a conflict + if e.Invalid != other.Invalid { + if e.Invalid { // we are invalid, we lose + return 1 + } + return -1 // they are invalid, we win + } + if e.Deleted != other.Deleted { + if e.Deleted { // we are deleted, we lose + return 1 + } + return -1 // they are deleted, we win + } + if d := cmp.Compare(e.Modified, other.Modified); d != 0 { + return -d // positive d means we were newer, so we win (negative return) + } + if vc == protocol.ConcurrentGreater { + return -1 // we have a better device ID, we win + } + return 1 // they win + default: + return 0 + } +} + +func (s *folderDB) periodicCheckpointLocked(fs []protocol.FileInfo) { + // Induce periodic checkpoints. We add points for each file and block, + // and checkpoint when we've written more than a threshold of points. + // This ensures we do not go too long without a checkpoint, while also + // not doing it incessantly for every update. + s.updatePoints += updatePointsPerFile * len(fs) + for _, f := range fs { + s.updatePoints += len(f.Blocks) * updatePointsPerBlock + } + if s.updatePoints > updatePointsThreshold { + conn, err := s.sql.Conn(context.Background()) + if err != nil { + l.Debugln(s.baseName, "conn:", err) + return + } + defer conn.Close() + if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 8388608`); err != nil { + l.Debugln(s.baseName, "PRAGMA journal_size_limit:", err) + } + + // Every 50th checkpoint becomes a truncate, in an effort to bring + // down the size now and then. + checkpointType := "RESTART" + if s.checkpointsCount > 50 { + checkpointType = "TRUNCATE" + } + cmd := fmt.Sprintf(`PRAGMA wal_checkpoint(%s)`, checkpointType) + row := conn.QueryRowContext(context.Background(), cmd) + + var res, modified, moved int + if row.Err() != nil { + l.Debugln(s.baseName, cmd+":", err) + } else if err := row.Scan(&res, &modified, &moved); err != nil { + l.Debugln(s.baseName, cmd+" (scan):", err) + } else { + l.Debugln(s.baseName, cmd, s.checkpointsCount, "at", s.updatePoints, "returned", res, modified, moved) + } + + // Reset the truncate counter when a truncate succeeded. If it + // failed, we'll keep trying it until we succeed. Increase it faster + // when we fail to checkpoint, as it's more likely the WAL is + // growing and will need truncation when we get out of this state. + if res == 1 { + s.checkpointsCount += 10 + } else if res == 0 && checkpointType == "TRUNCATE" { + s.checkpointsCount = 0 + } else { + s.checkpointsCount++ + } + s.updatePoints = 0 + } +} diff --git a/internal/db/sqlite/sql/schema/10-schema.sql b/internal/db/sqlite/sql/schema/common/10-schema.sql similarity index 100% rename from internal/db/sqlite/sql/schema/10-schema.sql rename to internal/db/sqlite/sql/schema/common/10-schema.sql diff --git a/internal/db/sqlite/sql/schema/70-kv.sql b/internal/db/sqlite/sql/schema/common/70-kv.sql similarity index 100% rename from internal/db/sqlite/sql/schema/70-kv.sql rename to internal/db/sqlite/sql/schema/common/70-kv.sql diff --git a/internal/db/sqlite/sql/schema/00-indexes.sql b/internal/db/sqlite/sql/schema/folder/00-indexes.sql similarity index 63% rename from internal/db/sqlite/sql/schema/00-indexes.sql rename to internal/db/sqlite/sql/schema/folder/00-indexes.sql index 1152d3b36..61e4a26ee 100644 --- a/internal/db/sqlite/sql/schema/00-indexes.sql +++ b/internal/db/sqlite/sql/schema/folder/00-indexes.sql @@ -4,16 +4,9 @@ -- License, v. 2.0. If a copy of the MPL was not distributed with this file, -- You can obtain one at https://mozilla.org/MPL/2.0/. --- folders map folder IDs as used by Syncthing to database folder indexes -CREATE TABLE IF NOT EXISTS folders ( - idx INTEGER NOT NULL PRIMARY KEY, - folder_id TEXT NOT NULL UNIQUE COLLATE BINARY -) STRICT -; - -- devices map device IDs as used by Syncthing to database device indexes CREATE TABLE IF NOT EXISTS devices ( - idx INTEGER NOT NULL PRIMARY KEY, + idx INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL UNIQUE COLLATE BINARY ) STRICT ; diff --git a/internal/db/sqlite/sql/schema/20-files.sql b/internal/db/sqlite/sql/schema/folder/20-files.sql similarity index 88% rename from internal/db/sqlite/sql/schema/20-files.sql rename to internal/db/sqlite/sql/schema/folder/20-files.sql index ca16be9c3..d826f6340 100644 --- a/internal/db/sqlite/sql/schema/20-files.sql +++ b/internal/db/sqlite/sql/schema/folder/20-files.sql @@ -22,7 +22,6 @@ -- Need bit. This allows for very efficient lookup of files needing handling -- on this device, which is a common query. CREATE TABLE IF NOT EXISTS files ( - folder_idx INTEGER NOT NULL, device_idx INTEGER NOT NULL, -- actual device ID or LocalDeviceID sequence INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, -- our local database sequence, for each and every entry remote_sequence INTEGER, -- remote device's sequence number, null for local or synthetic entries @@ -35,8 +34,7 @@ CREATE TABLE IF NOT EXISTS files ( invalid INTEGER NOT NULL, -- boolean local_flags INTEGER NOT NULL, blocklist_hash BLOB, -- null when there are no blocks - FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE, - FOREIGN KEY(folder_idx) REFERENCES folders(idx) ON DELETE CASCADE + FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE ) STRICT ; -- FileInfos store the actual protobuf object. We do this separately to keep @@ -48,15 +46,15 @@ CREATE TABLE IF NOT EXISTS fileinfos ( ) STRICT ; -- There can be only one file per folder, device, and remote sequence number -CREATE UNIQUE INDEX IF NOT EXISTS files_remote_sequence ON files (folder_idx, device_idx, remote_sequence) +CREATE UNIQUE INDEX IF NOT EXISTS files_remote_sequence ON files (device_idx, remote_sequence) WHERE remote_sequence IS NOT NULL ; -- There can be only one file per folder, device, and name -CREATE UNIQUE INDEX IF NOT EXISTS files_device_name ON files (folder_idx, device_idx, name) +CREATE UNIQUE INDEX IF NOT EXISTS files_device_name ON files (device_idx, name) ; -- We want to be able to look up & iterate files based on just folder and name -CREATE INDEX IF NOT EXISTS files_name_only ON files (folder_idx, name) +CREATE INDEX IF NOT EXISTS files_name_only ON files (name) ; -- We want to be able to look up & iterate files based on blocks hash -CREATE INDEX IF NOT EXISTS files_blocklist_hash_only ON files (blocklist_hash, device_idx, folder_idx) WHERE blocklist_hash IS NOT NULL +CREATE INDEX IF NOT EXISTS files_blocklist_hash_only ON files (blocklist_hash, device_idx) WHERE blocklist_hash IS NOT NULL ; diff --git a/internal/db/sqlite/sql/schema/30-indexids.sql b/internal/db/sqlite/sql/schema/folder/30-indexids.sql similarity index 70% rename from internal/db/sqlite/sql/schema/30-indexids.sql rename to internal/db/sqlite/sql/schema/folder/30-indexids.sql index dde9851bc..e6a40bb53 100644 --- a/internal/db/sqlite/sql/schema/30-indexids.sql +++ b/internal/db/sqlite/sql/schema/folder/30-indexids.sql @@ -7,18 +7,16 @@ -- indexids holds the index ID and maximum sequence for a given device and folder CREATE TABLE IF NOT EXISTS indexids ( device_idx INTEGER NOT NULL, - folder_idx INTEGER NOT NULL, index_id TEXT NOT NULL COLLATE BINARY, sequence INTEGER NOT NULL DEFAULT 0, - PRIMARY KEY(device_idx, folder_idx), - FOREIGN KEY(folder_idx) REFERENCES folders(idx) ON DELETE CASCADE, + PRIMARY KEY(device_idx), FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE ) STRICT, WITHOUT ROWID ; CREATE TRIGGER IF NOT EXISTS indexids_seq AFTER INSERT ON files BEGIN - INSERT INTO indexids (folder_idx, device_idx, index_id, sequence) - VALUES (NEW.folder_idx, NEW.device_idx, "", COALESCE(NEW.remote_sequence, NEW.sequence)) + INSERT INTO indexids (device_idx, index_id, sequence) + VALUES (NEW.device_idx, "", COALESCE(NEW.remote_sequence, NEW.sequence)) ON CONFLICT DO UPDATE SET sequence = COALESCE(NEW.remote_sequence, NEW.sequence); END ; diff --git a/internal/db/sqlite/sql/schema/40-counts.sql b/internal/db/sqlite/sql/schema/folder/40-counts.sql similarity index 57% rename from internal/db/sqlite/sql/schema/40-counts.sql rename to internal/db/sqlite/sql/schema/folder/40-counts.sql index cac1851fb..9b123c725 100644 --- a/internal/db/sqlite/sql/schema/40-counts.sql +++ b/internal/db/sqlite/sql/schema/folder/40-counts.sql @@ -9,16 +9,14 @@ -- Counts and sizes are maintained for each device, folder, type, flag bits -- combination. CREATE TABLE IF NOT EXISTS counts ( - folder_idx INTEGER NOT NULL, device_idx INTEGER NOT NULL, type INTEGER NOT NULL, local_flags INTEGER NOT NULL, count INTEGER NOT NULL, size INTEGER NOT NULL, deleted INTEGER NOT NULL, -- boolean - PRIMARY KEY(folder_idx, device_idx, type, local_flags, deleted), - FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE, - FOREIGN KEY(folder_idx) REFERENCES folders(idx) ON DELETE CASCADE + PRIMARY KEY(device_idx, type, local_flags, deleted), + FOREIGN KEY(device_idx) REFERENCES devices(idx) ON DELETE CASCADE ) STRICT, WITHOUT ROWID ; @@ -26,28 +24,24 @@ CREATE TABLE IF NOT EXISTS counts ( CREATE TRIGGER IF NOT EXISTS counts_insert AFTER INSERT ON files BEGIN - INSERT INTO counts (folder_idx, device_idx, type, local_flags, count, size, deleted) - VALUES (NEW.folder_idx, NEW.device_idx, NEW.type, NEW.local_flags, 1, NEW.size, NEW.deleted) + INSERT INTO counts (device_idx, type, local_flags, count, size, deleted) + VALUES (NEW.device_idx, NEW.type, NEW.local_flags, 1, NEW.size, NEW.deleted) ON CONFLICT DO UPDATE SET count = count + 1, size = size + NEW.size; END ; CREATE TRIGGER IF NOT EXISTS counts_delete AFTER DELETE ON files BEGIN UPDATE counts SET count = count - 1, size = size - OLD.size - WHERE folder_idx = OLD.folder_idx AND device_idx = OLD.device_idx AND type = OLD.type AND local_flags = OLD.local_flags AND deleted = OLD.deleted; + WHERE device_idx = OLD.device_idx AND type = OLD.type AND local_flags = OLD.local_flags AND deleted = OLD.deleted; END ; CREATE TRIGGER IF NOT EXISTS counts_update AFTER UPDATE OF local_flags ON files WHEN NEW.local_flags != OLD.local_flags BEGIN - INSERT INTO counts (folder_idx, device_idx, type, local_flags, count, size, deleted) - VALUES (NEW.folder_idx, NEW.device_idx, NEW.type, NEW.local_flags, 1, NEW.size, NEW.deleted) + INSERT INTO counts (device_idx, type, local_flags, count, size, deleted) + VALUES (NEW.device_idx, NEW.type, NEW.local_flags, 1, NEW.size, NEW.deleted) ON CONFLICT DO UPDATE SET count = count + 1, size = size + NEW.size; UPDATE counts SET count = count - 1, size = size - OLD.size - WHERE folder_idx = OLD.folder_idx AND device_idx = OLD.device_idx AND type = OLD.type AND local_flags = OLD.local_flags AND deleted = OLD.deleted; + WHERE device_idx = OLD.device_idx AND type = OLD.type AND local_flags = OLD.local_flags AND deleted = OLD.deleted; END ; -DROP TRIGGER IF EXISTS counts_update_add -- tmp migration -; -DROP TRIGGER IF EXISTS counts_update_del -- tmp migration -; diff --git a/internal/db/sqlite/sql/schema/50-blocks.sql b/internal/db/sqlite/sql/schema/folder/50-blocks.sql similarity index 100% rename from internal/db/sqlite/sql/schema/50-blocks.sql rename to internal/db/sqlite/sql/schema/folder/50-blocks.sql diff --git a/internal/db/sqlite/sql/schema/50-mtimes.sql b/internal/db/sqlite/sql/schema/folder/50-mtimes.sql similarity index 76% rename from internal/db/sqlite/sql/schema/50-mtimes.sql rename to internal/db/sqlite/sql/schema/folder/50-mtimes.sql index 1d6f6b5ba..5b3313c10 100644 --- a/internal/db/sqlite/sql/schema/50-mtimes.sql +++ b/internal/db/sqlite/sql/schema/folder/50-mtimes.sql @@ -6,11 +6,9 @@ --- Backing for the MtimeFS CREATE TABLE IF NOT EXISTS mtimes ( - folder_idx INTEGER NOT NULL, name TEXT NOT NULL, ondisk INTEGER NOT NULL, -- unix nanos virtual INTEGER NOT NULL, -- unix nanos - PRIMARY KEY(folder_idx, name), - FOREIGN KEY(folder_idx) REFERENCES folders(idx) ON DELETE CASCADE + PRIMARY KEY(name) ) STRICT, WITHOUT ROWID ; diff --git a/internal/db/sqlite/sql/schema/main/00-indexes.sql b/internal/db/sqlite/sql/schema/main/00-indexes.sql new file mode 100644 index 000000000..0a0d6a8dd --- /dev/null +++ b/internal/db/sqlite/sql/schema/main/00-indexes.sql @@ -0,0 +1,12 @@ +-- Copyright (C) 2025 The Syncthing Authors. +-- +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this file, +-- You can obtain one at https://mozilla.org/MPL/2.0/. + +-- folders map folder IDs as used by Syncthing to database folder indexes +CREATE TABLE IF NOT EXISTS folders ( + idx INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + folder_id TEXT NOT NULL UNIQUE COLLATE BINARY +) STRICT +; diff --git a/lib/locations/locations.go b/lib/locations/locations.go index 3c38c7511..153b89d1d 100644 --- a/lib/locations/locations.go +++ b/lib/locations/locations.go @@ -48,7 +48,7 @@ const ( UserHomeBaseDir BaseDirEnum = "userHome" levelDBDir = "index-v0.14.0.db" - databaseName = "index-v2.db" + databaseName = "index-v2" configFileName = "config.xml" defaultStateDir = ".local/state/syncthing" oldDefaultConfigDir = ".config/syncthing" diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 3500616eb..03718fb91 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -315,23 +315,15 @@ func (f *sendReceiveFolder) processNeeded(dbUpdateChan chan<- dbUpdateJob, copyC fileDeletions := map[string]protocol.FileInfo{} buckets := map[string][]protocol.FileInfo{} - // Buffer the full list of needed files. This is somewhat wasteful and - // uses a lot of memory, but we need to keep the duration of the - // database read short and not do a bunch of file and data I/O inside - // the loop. If we forego the ability for users to repriorize the pull - // queue on the fly we could do this in batches, though that would also - // be a bit slower and less efficient in other ways. - files, err := itererr.Collect(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0)) - if err != nil { - return changed, nil, nil, err - } - // Iterate the list of items that we need and sort them into piles. // Regular files to pull goes into the file queue, everything else // (directories, symlinks and deletes) goes into the "process directly" // pile. loop: - for _, file := range files { + for file, err := range itererr.Zip(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0)) { + if err != nil { + return changed, nil, nil, err + } select { case <-f.ctx.Done(): break loop @@ -1353,58 +1345,58 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch buf = protocol.BufferPool.Upgrade(buf, int(block.Size)) found := false - for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(block.Hash)) { + blocks, _ := f.model.sdb.AllLocalBlocksWithHash(block.Hash) + for _, e := range blocks { + res, err := f.model.sdb.AllLocalFilesWithBlocksHashAnyFolder(e.BlocklistHash) if err != nil { - break + continue } - it, errFn := f.model.sdb.AllLocalFilesWithBlocksHashAnyFolder(e.BlocklistHash) - for folderID, fi := range it { + for folderID, files := range res { ffs := folderFilesystems[folderID] - fd, err := ffs.Open(fi.Name) - if err != nil { - continue - } - defer fd.Close() - - _, err = fd.ReadAt(buf, e.Offset) - if err != nil { - fd.Close() - continue - } - - // Hash is not SHA256 as it's an encrypted hash token. In that - // case we can't verify the block integrity so we'll take it on - // trust. (The other side can and will verify.) - if f.Type != config.FolderTypeReceiveEncrypted { - if err := f.verifyBuffer(buf, block); err != nil { - l.Debugln("Finder failed to verify buffer", err) + for _, fi := range files { + fd, err := ffs.Open(fi.Name) + if err != nil { continue } - } + defer fd.Close() - if f.CopyRangeMethod != config.CopyRangeMethodStandard { - err = f.withLimiter(func() error { - dstFd.mut.Lock() - defer dstFd.mut.Unlock() - return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, e.Offset, block.Offset, int64(block.Size)) - }) - } else { - err = f.limitedWriteAt(dstFd, buf, block.Offset) - } - if err != nil { - state.fail(fmt.Errorf("dst write: %w", err)) + _, err = fd.ReadAt(buf, e.Offset) + if err != nil { + fd.Close() + continue + } + + // Hash is not SHA256 as it's an encrypted hash token. In that + // case we can't verify the block integrity so we'll take it on + // trust. (The other side can and will verify.) + if f.Type != config.FolderTypeReceiveEncrypted { + if err := f.verifyBuffer(buf, block); err != nil { + l.Debugln("Finder failed to verify buffer", err) + continue + } + } + + if f.CopyRangeMethod != config.CopyRangeMethodStandard { + err = f.withLimiter(func() error { + dstFd.mut.Lock() + defer dstFd.mut.Unlock() + return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, e.Offset, block.Offset, int64(block.Size)) + }) + } else { + err = f.limitedWriteAt(dstFd, buf, block.Offset) + } + if err != nil { + state.fail(fmt.Errorf("dst write: %w", err)) + break + } + if fi.Name == state.file.Name { + state.copiedFromOrigin(block.Size) + } else { + state.copiedFromElsewhere(block.Size) + } + found = true break } - if fi.Name == state.file.Name { - state.copiedFromOrigin(block.Size) - } else { - state.copiedFromElsewhere(block.Size) - } - found = true - break - } - if err := errFn(); err != nil { - l.Warnln(err) } } diff --git a/lib/model/folder_sendrecv_test.go b/lib/model/folder_sendrecv_test.go index bd1376262..e307670da 100644 --- a/lib/model/folder_sendrecv_test.go +++ b/lib/model/folder_sendrecv_test.go @@ -19,7 +19,6 @@ import ( "testing" "time" - "github.com/syncthing/syncthing/internal/itererr" "github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/events" @@ -326,11 +325,11 @@ func TestCopierCleanup(t *testing.T) { // Update index (removing old blocks) f.updateLocalsFromScanning([]protocol.FileInfo{file}) - if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(blocks[0].Hash)); err != nil || len(vals) > 0 { + if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[0].Hash); err != nil || len(vals) > 0 { t.Error("Unexpected block found") } - if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(blocks[1].Hash)); err != nil || len(vals) == 0 { + if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[1].Hash); err != nil || len(vals) == 0 { t.Error("Expected block not found") } @@ -339,11 +338,11 @@ func TestCopierCleanup(t *testing.T) { // Update index (removing old blocks) f.updateLocalsFromScanning([]protocol.FileInfo{file}) - if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(blocks[0].Hash)); err != nil || len(vals) == 0 { + if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[0].Hash); err != nil || len(vals) == 0 { t.Error("Unexpected block found") } - if vals, err := itererr.Collect(m.sdb.AllLocalBlocksWithHash(blocks[1].Hash)); err != nil || len(vals) > 0 { + if vals, err := m.sdb.AllLocalBlocksWithHash(blocks[1].Hash); err != nil || len(vals) > 0 { t.Error("Expected block not found") } }