sqlite: Make migrations atomic

Setting the version number only when all migrations are done
means that the version will be wrong if a migration fails.

Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
This commit is contained in:
Kévin Commaille
2024-08-27 11:54:39 +02:00
committed by Stefan Ceriu
parent 9edca06d3b
commit 66e901bb9b
4 changed files with 45 additions and 35 deletions

View File

@@ -203,21 +203,24 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
// the error message: "cannot change into wal mode from within a transaction".
conn.execute_batch("PRAGMA journal_mode = wal;").await?;
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/crypto_store/001_init.sql"))
txn.execute_batch(include_str!("../migrations/crypto_store/001_init.sql"))?;
txn.set_db_version(1)
})
.await?;
}
if version < 2 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/crypto_store/002_reset_olm_hash.sql"))
txn.execute_batch(include_str!("../migrations/crypto_store/002_reset_olm_hash.sql"))?;
txn.set_db_version(2)
})
.await?;
}
if version < 3 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/crypto_store/003_room_settings.sql"))
txn.execute_batch(include_str!("../migrations/crypto_store/003_room_settings.sql"))?;
txn.set_db_version(3)
})
.await?;
}
@@ -226,14 +229,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!(
"../migrations/crypto_store/004_drop_outbound_group_sessions.sql"
))
))?;
txn.set_db_version(4)
})
.await?;
}
if version < 5 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/crypto_store/005_withheld_code.sql"))
txn.execute_batch(include_str!("../migrations/crypto_store/005_withheld_code.sql"))?;
txn.set_db_version(5)
})
.await?;
}
@@ -242,27 +247,28 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!(
"../migrations/crypto_store/006_drop_outbound_group_sessions.sql"
))
))?;
txn.set_db_version(6)
})
.await?;
}
if version < 7 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/crypto_store/007_lock_leases.sql"))
txn.execute_batch(include_str!("../migrations/crypto_store/007_lock_leases.sql"))?;
txn.set_db_version(7)
})
.await?;
}
if version < 8 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/crypto_store/008_secret_inbox.sql"))
txn.execute_batch(include_str!("../migrations/crypto_store/008_secret_inbox.sql"))?;
txn.set_db_version(8)
})
.await?;
}
conn.set_db_version(DATABASE_VERSION).await?;
Ok(())
}

View File

@@ -13,7 +13,7 @@ use tracing::debug;
use crate::{
error::{Error, Result},
utils::{Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
utils::{Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt},
OpenStoreError,
};
@@ -83,7 +83,12 @@ impl SqliteEventCacheStore {
/// version
///
/// If `to` is `None`, the current database version will be used.
async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
async fn run_migrations(
&self,
_conn: &SqliteAsyncConn,
from: u8,
to: Option<u8>,
) -> Result<()> {
let to = to.unwrap_or(DATABASE_VERSION);
if from < to {
@@ -94,8 +99,6 @@ impl SqliteEventCacheStore {
// There is no migration currently since it's the first version of the database.
conn.set_db_version(to).await?;
Ok(())
}
@@ -144,12 +147,11 @@ async fn init(conn: &SqliteAsyncConn) -> Result<()> {
// the error message: "cannot change into wal mode from within a transaction".
conn.execute_batch("PRAGMA journal_mode = wal;").await?;
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))
txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
txn.set_db_version(1)
})
.await?;
conn.set_db_version(1).await?;
Ok(())
}

View File

@@ -41,7 +41,10 @@ use tracing::{debug, warn};
use crate::{
error::{Error, Result},
utils::{repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
utils::{
repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
SqliteKeyValueStoreConnExt,
},
OpenStoreError,
};
@@ -161,6 +164,7 @@ impl SqliteStateStore {
"../migrations/state_store/002_b_replace_room_info.sql"
))?;
txn.set_db_version(2)?;
Result::<_, Error>::Ok(())
})
.await?;
@@ -211,6 +215,7 @@ impl SqliteStateStore {
.execute((data, room_id))?;
}
txn.set_db_version(3)?;
Result::<_, Error>::Ok(())
})
.await?;
@@ -220,7 +225,7 @@ impl SqliteStateStore {
conn.with_transaction(move |txn| {
// Create new table.
txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
Result::<_, Error>::Ok(())
txn.set_db_version(4)
})
.await?;
}
@@ -231,7 +236,7 @@ impl SqliteStateStore {
txn.execute_batch(include_str!(
"../migrations/state_store/004_send_queue_with_roomid_value.sql"
))?;
Result::<_, Error>::Ok(())
txn.set_db_version(4)
})
.await?;
}
@@ -242,7 +247,7 @@ impl SqliteStateStore {
txn.execute_batch(include_str!(
"../migrations/state_store/005_send_queue_dependent_events.sql"
))?;
Result::<_, Error>::Ok(())
txn.set_db_version(6)
})
.await?;
}
@@ -251,13 +256,11 @@ impl SqliteStateStore {
conn.with_transaction(move |txn| {
// Drop media table.
txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
Result::<_, Error>::Ok(())
txn.set_db_version(7)
})
.await?;
}
conn.set_db_version(to).await?;
Ok(())
}
@@ -375,13 +378,12 @@ async fn init(conn: &SqliteAsyncConn) -> Result<()> {
// the error message: "cannot change into wal mode from within a transaction".
conn.execute_batch("PRAGMA journal_mode = wal;").await?;
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))
txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
txn.set_db_version(1)?;
Ok(())
})
.await?;
conn.set_db_version(1).await?;
Ok(())
.await
}
trait SqliteConnectionStateStoreExt {

View File

@@ -256,6 +256,11 @@ impl<'a> SqliteTransactionExt for Transaction<'a> {
pub(crate) trait SqliteKeyValueStoreConnExt {
/// Store the given value for the given key.
fn set_kv(&self, key: &str, value: &[u8]) -> rusqlite::Result<()>;
/// Set the version of the database.
fn set_db_version(&self, version: u8) -> rusqlite::Result<()> {
self.set_kv("version", &[version])
}
}
impl SqliteKeyValueStoreConnExt for rusqlite::Connection {
@@ -317,11 +322,6 @@ pub(crate) trait SqliteKeyValueStoreAsyncConnExt: SqliteAsyncConnExt {
}
}
/// Set the version of the database.
async fn set_db_version(&self, version: u8) -> rusqlite::Result<()> {
self.set_kv("version", vec![version]).await
}
/// Get the [`StoreCipher`] of the database or create it.
async fn get_or_create_store_cipher(
&self,