From 82a0dd8eaab9f172c09f6beb36f85fc677fd39bd Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 2 Apr 2025 13:19:34 -0700 Subject: [PATCH] chore(db): use shorter read transactions and periodic checkpoint for smaller WAL (#10027) Also make sure our journal size limit is in effect when checkpointing. --- internal/db/sqlite/db.go | 1 + internal/db/sqlite/db_open.go | 4 ---- internal/db/sqlite/db_service.go | 14 +++++++---- internal/db/sqlite/db_update.go | 41 +++++++++++++++++++++++++++++++- lib/model/indexhandler.go | 17 ++----------- 5 files changed, 53 insertions(+), 24 deletions(-) diff --git a/internal/db/sqlite/db.go b/internal/db/sqlite/db.go index cb2da41d8..960218e1b 100644 --- a/internal/db/sqlite/db.go +++ b/internal/db/sqlite/db.go @@ -20,6 +20,7 @@ type DB struct { sql *sqlx.DB localDeviceIdx int64 updateLock sync.Mutex + updatePoints int statementsMut sync.RWMutex statements map[string]*sqlx.Stmt diff --git a/internal/db/sqlite/db_open.go b/internal/db/sqlite/db_open.go index 2f959be15..0a482f17c 100644 --- a/internal/db/sqlite/db_open.go +++ b/internal/db/sqlite/db_open.go @@ -36,10 +36,6 @@ func Open(path string) (*DB, error) { // https://www.sqlite.org/pragma.html#pragma_optimize return nil, wrap(err, "PRAGMA optimize") } - if _, err := sqlDB.Exec(`PRAGMA journal_size_limit = 67108864`); err != nil { - // https://www.powersync.com/blog/sqlite-optimizations-for-ultra-high-performance - return nil, wrap(err, "PRAGMA journal_size_limit") - } return openCommon(sqlDB) } diff --git a/internal/db/sqlite/db_service.go b/internal/db/sqlite/db_service.go index 647c1fdc1..b4bd71a1b 100644 --- a/internal/db/sqlite/db_service.go +++ b/internal/db/sqlite/db_service.go @@ -86,10 +86,16 @@ func (s *Service) periodic(ctx context.Context) error { return wrap(err) } - _, _ = s.sdb.sql.ExecContext(ctx, `ANALYZE`) - _, _ = s.sdb.sql.ExecContext(ctx, `PRAGMA optimize`) - _, _ = s.sdb.sql.ExecContext(ctx, `PRAGMA incremental_vacuum`) - _, _ = s.sdb.sql.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`) + conn, err := s.sdb.sql.Conn(ctx) + if err != nil { + return wrap(err) + } + defer conn.Close() + _, _ = conn.ExecContext(ctx, `ANALYZE`) + _, _ = conn.ExecContext(ctx, `PRAGMA optimize`) + _, _ = conn.ExecContext(ctx, `PRAGMA incremental_vacuum`) + _, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 67108864`) + _, _ = conn.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`) return nil } diff --git a/internal/db/sqlite/db_update.go b/internal/db/sqlite/db_update.go index d6db8b30a..6bd34e6a4 100644 --- a/internal/db/sqlite/db_update.go +++ b/internal/db/sqlite/db_update.go @@ -23,6 +23,13 @@ import ( "google.golang.org/protobuf/proto" ) +const ( + // Arbitrarily chosen values for checkpoint frequency.... + updatePointsPerFile = 100 + updatePointsPerBlock = 1 + updatePointsThreshold = 250_000 +) + func (s *DB) Update(folder string, device protocol.DeviceID, fs []protocol.FileInfo) error { s.updateLock.Lock() defer s.updateLock.Unlock() @@ -143,7 +150,12 @@ func (s *DB) Update(folder string, device protocol.DeviceID, fs []protocol.FileI } } - return wrap(tx.Commit()) + if err := tx.Commit(); err != nil { + return wrap(err) + } + + s.periodicCheckpointLocked(fs) + return nil } func (s *DB) DropFolder(folder string) error { @@ -554,3 +566,30 @@ func (e fileRow) Compare(other fileRow) int { return 0 } } + +func (s *DB) periodicCheckpointLocked(fs []protocol.FileInfo) { + // Induce periodic checkpoints. We add points for each file and block, + // and checkpoint when we've written more than a threshold of points. + // This ensures we do not go too long without a checkpoint, while also + // not doing it incessantly for every update. + s.updatePoints += updatePointsPerFile * len(fs) + for _, f := range fs { + s.updatePoints += len(f.Blocks) * updatePointsPerBlock + } + if s.updatePoints > updatePointsThreshold { + l.Debugln("checkpoint at", s.updatePoints) + conn, err := s.sql.Conn(context.Background()) + if err != nil { + l.Debugln("conn:", err) + return + } + defer conn.Close() + if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 67108864`); err != nil { + l.Debugln("PRAGMA journal_size_limit(RESTART):", err) + } + if _, err := conn.ExecContext(context.Background(), `PRAGMA wal_checkpoint(RESTART)`); err != nil { + l.Debugln("PRAGMA wal_checkpoint(RESTART):", err) + } + s.updatePoints = 0 + } +} diff --git a/lib/model/indexhandler.go b/lib/model/indexhandler.go index 49c33666e..2f4d2d9c7 100644 --- a/lib/model/indexhandler.go +++ b/lib/model/indexhandler.go @@ -295,7 +295,6 @@ func (s *indexHandler) sendIndexTo(ctx context.Context) error { var f protocol.FileInfo previousWasDelete := false - t0 := time.Now() for fi, err := range itererr.Zip(s.sdb.AllLocalFilesBySequence(s.folder, protocol.LocalDeviceID, s.localPrevSequence+1, 5000)) { if err != nil { return err @@ -304,15 +303,7 @@ func (s *indexHandler) sendIndexTo(ctx context.Context) error { // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that // the batch ends with a non-delete, or that the last item in the batch is already a delete if batch.Full() && (!fi.IsDeleted() || previousWasDelete) { - if err := batch.Flush(); err != nil { - return err - } - if time.Since(t0) > 5*time.Second { - // minor hack -- avoid very long running read transactions - // during index transmission, to help prevent excessive - // growth of database WAL file - break - } + break } if fi.SequenceNo() < s.localPrevSequence+1 { @@ -348,11 +339,7 @@ func (s *indexHandler) sendIndexTo(ctx context.Context) error { batch.Append(f) } - if err := batch.Flush(); err != nil { - return err - } - - return nil + return batch.Flush() } func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string, prevSequence, lastSequence int64) error {