From dc40309cbec5e4e8131a8782bab5538e8003cc69 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Wed, 6 Jul 2022 16:06:47 +0200 Subject: [PATCH] feat(sled): Introduce SledStoreBuilder, allow migration conflict strategy configuration --- crates/matrix-sdk-sled/Cargo.toml | 3 + crates/matrix-sdk-sled/src/lib.rs | 29 +- crates/matrix-sdk-sled/src/state_store.rs | 377 ++++++++++++++++------ 3 files changed, 302 insertions(+), 107 deletions(-) diff --git a/crates/matrix-sdk-sled/Cargo.toml b/crates/matrix-sdk-sled/Cargo.toml index 787935531..8edc42c78 100644 --- a/crates/matrix-sdk-sled/Cargo.toml +++ b/crates/matrix-sdk-sled/Cargo.toml @@ -27,6 +27,7 @@ experimental-timeline = ["matrix-sdk-base/experimental-timeline"] async-stream = "0.3.3" async-trait = "0.1.53" dashmap = "5.2.0" +derive_builder = "0.11.2" futures-core = "0.3.21" futures-util = { version = "0.3.21", default-features = false } matrix-sdk-base = { version = "0.5.0", path = "../matrix-sdk-base", optional = true } @@ -40,6 +41,7 @@ sled = "0.34.7" thiserror = "1.0.30" tokio = { version = "1.17.0", default-features = false, features = ["sync", "fs"] } tracing = "0.1.34" + fs_extra = "1.2.0" [dev-dependencies] matrix-sdk-base = { path = "../matrix-sdk-base", features = ["testing"] } @@ -47,4 +49,5 @@ matrix-sdk-crypto = { path = "../matrix-sdk-crypto", features = ["testing"] } matrix-sdk-test = { path = "../matrix-sdk-test" } once_cell = "1.10.0" tempfile = "3.3.0" +glob = "0.3.0" tokio = { version = "1.17.0", default-features = false, features = ["rt-multi-thread", "macros"] } diff --git a/crates/matrix-sdk-sled/src/lib.rs b/crates/matrix-sdk-sled/src/lib.rs index fdc484e99..3fbbfb69c 100644 --- a/crates/matrix-sdk-sled/src/lib.rs +++ b/crates/matrix-sdk-sled/src/lib.rs @@ -16,7 +16,7 @@ mod state_store; #[cfg(feature = "crypto-store")] pub use cryptostore::SledStore as CryptoStore; #[cfg(feature = "state-store")] -pub use state_store::SledStore as StateStore; +pub use state_store::{SledStore as StateStore, SledStoreBuilder as SledStateStoreBuilder}; /// All the errors that can occur when opening a sled store. #[derive(Error, Debug)] @@ -61,11 +61,13 @@ pub fn make_store_config( #[cfg(not(feature = "crypto-store"))] { - let state_store = if let Some(passphrase) = passphrase { - StateStore::open_with_passphrase(path, passphrase)? - } else { - StateStore::open_with_path(path)? + let mut store_builder = SledStateStoreBuilder::default(); + store_builder.path(path.as_ref().to_path_buf()); + + if let Some(passphrase) = passphrase { + store_builder.passphrase(passphrase.to_owned()); }; + let state_store = store_builder.build().map_err(StoreError::backend)?; Ok(StoreConfig::new().state_store(Box::new(state_store))) } @@ -78,13 +80,14 @@ fn open_stores_with_path( path: impl AsRef, passphrase: Option<&str>, ) -> Result<(Box, Box), OpenStoreError> { + let mut store_builder = SledStateStoreBuilder::default(); + store_builder.path(path.as_ref().to_path_buf()); + if let Some(passphrase) = passphrase { - let state_store = StateStore::open_with_passphrase(path, passphrase)?; - let crypto_store = state_store.open_crypto_store()?; - Ok((Box::new(state_store), Box::new(crypto_store))) - } else { - let state_store = StateStore::open_with_path(path)?; - let crypto_store = state_store.open_crypto_store()?; - Ok((Box::new(state_store), Box::new(crypto_store))) - } + store_builder.passphrase(passphrase.to_owned()); + }; + + let state_store = store_builder.build().map_err(StoreError::backend)?; + let crypto_store = state_store.open_crypto_store()?; + Ok((Box::new(state_store), Box::new(crypto_store))) } diff --git a/crates/matrix-sdk-sled/src/state_store.rs b/crates/matrix-sdk-sled/src/state_store.rs index c6a925f28..ea70a6936 100644 --- a/crates/matrix-sdk-sled/src/state_store.rs +++ b/crates/matrix-sdk-sled/src/state_store.rs @@ -16,12 +16,13 @@ use std::{ collections::BTreeSet, path::{Path, PathBuf}, sync::Arc, - time::Instant, + time::{Instant, SystemTime, UNIX_EPOCH}, }; #[cfg(feature = "experimental-timeline")] use async_stream::stream; use async_trait::async_trait; +use derive_builder::Builder; use futures_core::stream::Stream; use futures_util::stream::{self, StreamExt, TryStreamExt}; use matrix_sdk_base::{ @@ -82,6 +83,29 @@ pub enum SledStoreError { Identifier(#[from] IdParseError), #[error(transparent)] Task(#[from] tokio::task::JoinError), + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + FsExtra(#[from] fs_extra::error::Error), + #[error("Can't migrate {path} from {old_version} to {new_version} without deleting data. See MigrationConflictStrategy for ways to configure.")] + MigrationConflict { path: PathBuf, old_version: usize, new_version: usize }, +} + +/// Sometimes Migrations can't procede without having to drop existing +/// data. This allows you to configure, how these cases should be handled. +#[allow(dead_code)] +#[derive(PartialEq, Clone, Debug)] +pub enum MigrationConflictStrategy { + /// Just drop the data, we don't care that we have to sync again + Drop, + /// Raise a `SledStoreError::MigrationConflict` error with the path to the + /// DB in question. The caller then has to take care about what they want + /// to do and try again after. + Raise, + /// _Default_: The _entire_ database is backed up under `$path.$timestamp.backup` + /// (this includes the crypto store if they are linked), before the state tables + /// are dropped. + BackupAndDrop, } impl From> for SledStoreError { @@ -150,8 +174,129 @@ const TIMELINE_METADATA: &str = "timeline-metadata"; #[cfg(feature = "experimental-timeline")] const TIMELINE: &str = "timeline"; +const ALL_DB_STORES: &[&str] = &[ + ACCOUNT_DATA, + SYNC_TOKEN, + DISPLAY_NAME, + INVITED_USER_ID, + JOINED_USER_ID, + MEDIA, + MEMBER, + PRESENCE, + PROFILE, + ROOM_ACCOUNT_DATA, + #[cfg(feature = "experimental-timeline")] + ROOM_EVENT_ID_POSITION, + ROOM_EVENT_RECEIPT, + ROOM_INFO, + ROOM_STATE, + ROOM_USER_RECEIPT, + ROOM, + STRIPPED_INVITED_USER_ID, + STRIPPED_JOINED_USER_ID, + STRIPPED_ROOM_INFO, + STRIPPED_ROOM_MEMBER, + STRIPPED_ROOM_STATE, + #[cfg(feature = "experimental-timeline")] + ROOM_EVENT_ID_POSITION, + #[cfg(feature = "experimental-timeline")] + TIMELINE_METADATA, + #[cfg(feature = "experimental-timeline")] + TIMELINE, +]; +const ALL_GLOBAL_KEYS: &[&str] = &[VERSION_KEY]; + type Result = std::result::Result; +#[derive(Builder, Debug, PartialEq)] +#[builder(name = "SledStoreBuilder", build_fn(skip))] +pub struct SledStoreBuilderConfig { + path: PathBuf, + passphrase: String, + #[builder(default = "MigrationConflictStrategy::BackupAndDrop")] + migration_conflict_strategy: MigrationConflictStrategy, +} + +impl SledStoreBuilder { + pub fn build(&mut self) -> Result { + let is_temp = self.path.is_none(); + + let mut cfg = Config::new().temporary(is_temp); + + let path = if let Some(path) = &self.path { + let path = path.join("matrix-sdk-store"); + + cfg = cfg.path(&path); + Some(path) + } else { + None + }; + + let db = cfg.open().map_err(StoreError::backend)?; + + let store_cipher = if let Some(passphrase) = &self.passphrase { + if let Some(inner) = db.get("store_cipher".encode())? { + Some(StoreCipher::import(passphrase, &inner)?.into()) + } else { + let cipher = StoreCipher::new()?; + db.insert("store_cipher".encode(), cipher.export(passphrase)?)?; + Some(cipher.into()) + } + } else { + None + }; + + let mut store = SledStore::open_helper(db, path, store_cipher)?; + + let migration_res = store.upgrade(); + if let Err(SledStoreError::MigrationConflict { path, .. }) = &migration_res { + // how are supposed to react about this? + match self + .migration_conflict_strategy + .as_ref() + .unwrap_or(&MigrationConflictStrategy::BackupAndDrop) + { + MigrationConflictStrategy::BackupAndDrop => { + let mut new_path = path.clone(); + new_path.set_extension(format!( + "{}.backup", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time doesn't go backwards") + .as_secs() + )); + fs_extra::dir::create_all(&new_path, false)?; + fs_extra::dir::copy(path, new_path, &fs_extra::dir::CopyOptions::new())?; + store.drop_tables()?; + return self.build(); + } + MigrationConflictStrategy::Drop => { + store.drop_tables()?; + return self.build(); + } + MigrationConflictStrategy::Raise => migration_res?, + } + } else { + migration_res?; + } + + Ok(store) + } + + // testing only + #[cfg(test)] + fn build_encrypted() -> StoreResult { + let db = Config::new().temporary(true).open().map_err(StoreError::backend)?; + + SledStore::open_helper( + db, + None, + Some(StoreCipher::new().expect("can't create store cipher").into()), + ) + .map_err(|e| e.into()) + } +} + #[derive(Clone)] pub struct SledStore { path: Option, @@ -195,6 +340,7 @@ impl std::fmt::Debug for SledStore { } } +#[allow(deprecated)] impl SledStore { fn open_helper( db: Db, @@ -235,7 +381,7 @@ impl SledStore { #[cfg(feature = "experimental-timeline")] let room_event_id_to_position = db.open_tree(ROOM_EVENT_ID_POSITION)?; - let mut database = Self { + Ok(Self { path, inner: db, store_cipher, @@ -265,64 +411,47 @@ impl SledStore { room_timeline_metadata, #[cfg(feature = "experimental-timeline")] room_event_id_to_position, - }; - - database.upgrade()?; - Ok(database) + }) } + #[deprecated(note = "Use SledStoreBuilder instead.")] pub fn open() -> StoreResult { - let db = Config::new().temporary(true).open().map_err(StoreError::backend)?; - - SledStore::open_helper(db, None, None).map_err(|e| e.into()) - } - - // testing only - #[cfg(test)] - fn open_encrypted() -> StoreResult { - let db = Config::new().temporary(true).open().map_err(StoreError::backend)?; - - SledStore::open_helper( - db, - None, - Some(StoreCipher::new().expect("can't create store cipher").into()), - ) - .map_err(|e| e.into()) + SledStoreBuilder::default().build().map_err(StoreError::backend) } + #[deprecated(note = "Use SledStoreBuilder instead.")] pub fn open_with_passphrase(path: impl AsRef, passphrase: &str) -> StoreResult { - Self::inner_open_with_passphrase(path, passphrase).map_err(|e| e.into()) - } - - fn inner_open_with_passphrase(path: impl AsRef, passphrase: &str) -> Result { - let path = path.as_ref().join("matrix-sdk-state"); - let db = Config::new().temporary(false).path(&path).open()?; - - let store_cipher = if let Some(inner) = db.get("store_cipher".encode())? { - StoreCipher::import(passphrase, &inner)? - } else { - let cipher = StoreCipher::new()?; - db.insert("store_cipher".encode(), cipher.export(passphrase)?)?; - cipher - } - .into(); - - SledStore::open_helper(db, Some(path), Some(store_cipher)) + SledStoreBuilder::default() + .path(path.as_ref().into()) + .passphrase(passphrase.to_owned()) + .build() + .map_err(StoreError::backend) } + #[deprecated(note = "Use SledStoreBuilder instead.")] pub fn open_with_path(path: impl AsRef) -> StoreResult { - Self::inner_open_with_path(path).map_err(|e| e.into()) + SledStoreBuilder::default().path(path.as_ref().into()).build().map_err(StoreError::backend) } - fn inner_open_with_path(path: impl AsRef) -> Result { - let path = path.as_ref().join("matrix-sdk-state"); - let db = Config::new().temporary(false).path(&path).open()?; + fn drop_tables(self) -> StoreResult<()> { + for name in ALL_DB_STORES { + self.inner.drop_tree(name).map_err(StoreError::backend)?; + } + for name in ALL_GLOBAL_KEYS { + self.inner.remove(name).map_err(StoreError::backend)?; + } - SledStore::open_helper(db, Some(path), None) + Ok(()) } - fn upgrade(&mut self) -> StoreResult<()> { - let db_version = self.inner.get(VERSION_KEY).map_err(StoreError::backend)?.map(|v| { + fn set_db_version(&self, version: u8) -> Result<()> { + self.inner.insert(VERSION_KEY, version.to_be_bytes().as_ref())?; + self.inner.flush()?; + Ok(()) + } + + fn upgrade(&mut self) -> Result<()> { + let db_version = self.inner.get(VERSION_KEY)?.map(|v| { let (version_bytes, _) = v.split_at(std::mem::size_of::()); u8::from_be_bytes(version_bytes.try_into().unwrap_or_default()) }); @@ -330,11 +459,7 @@ impl SledStore { let old_version = match db_version { None => { // we are fresh, let's write the current version - self.inner - .insert(VERSION_KEY, DATABASE_VERSION.to_be_bytes().as_ref()) - .map_err(StoreError::backend)?; - self.inner.flush().map_err(StoreError::backend)?; - return Ok(()); + return self.set_db_version(DATABASE_VERSION); } Some(version) if version == DATABASE_VERSION => { // current, we don't have to do anything @@ -352,52 +477,25 @@ impl SledStore { if old_version == 1 { if self.store_cipher.is_some() { // we stored some fields un-encrypted. Drop them to enforce re-creation - let db = &self.inner; - - db.drop_tree(JOINED_USER_ID).map_err(StoreError::backend)?; - db.drop_tree(INVITED_USER_ID).map_err(StoreError::backend)?; - db.drop_tree(STRIPPED_JOINED_USER_ID).map_err(StoreError::backend)?; - db.drop_tree(STRIPPED_INVITED_USER_ID).map_err(StoreError::backend)?; - db.drop_tree(CUSTOM).map_err(StoreError::backend)?; - db.drop_tree(MEDIA).map_err(StoreError::backend)?; - - self.session.remove(SYNC_TOKEN.encode()).map_err(StoreError::backend)?; - - self.stripped_joined_user_ids = - db.open_tree(STRIPPED_JOINED_USER_ID).map_err(StoreError::backend)?; - self.stripped_invited_user_ids = - db.open_tree(STRIPPED_INVITED_USER_ID).map_err(StoreError::backend)?; - self.joined_user_ids = db.open_tree(JOINED_USER_ID).map_err(StoreError::backend)?; - self.invited_user_ids = - db.open_tree(INVITED_USER_ID).map_err(StoreError::backend)?; - self.custom = db.open_tree(CUSTOM).map_err(StoreError::backend)?; - self.media = db.open_tree(MEDIA).map_err(StoreError::backend)?; - - #[cfg(feature = "experimental-timeline")] - { - db.drop_tree(TIMELINE).map_err(StoreError::backend)?; - db.drop_tree(TIMELINE_METADATA).map_err(StoreError::backend)?; - db.drop_tree(ROOM_EVENT_ID_POSITION).map_err(StoreError::backend)?; - - self.room_timeline = db.open_tree(TIMELINE).map_err(StoreError::backend)?; - self.room_timeline_metadata = - db.open_tree(TIMELINE_METADATA).map_err(StoreError::backend)?; - self.room_event_id_to_position = - db.open_tree(ROOM_EVENT_ID_POSITION).map_err(StoreError::backend)?; - } + return Err(SledStoreError::MigrationConflict { + path: self.path.take().expect("Path must exist for a migration to fail"), + old_version: old_version.into(), + new_version: DATABASE_VERSION.into(), + }); } - // successfully upgraded - self.inner - .insert(VERSION_KEY, DATABASE_VERSION.to_be_bytes().as_ref()) - .map_err(StoreError::backend)?; - self.inner.flush().map_err(StoreError::backend)?; + // no migration to handle + self.set_db_version(2u8)?; return Ok(()); } // FUTURE UPGRADE CODE GOES HERE // can't upgrade from that version to the new one - Err(StoreError::UnsupportedDatabaseVersion(old_version.into(), DATABASE_VERSION.into())) + Err(SledStoreError::MigrationConflict { + path: self.path.take().expect("Path must exist for a migration to fail"), + old_version: old_version.into(), + new_version: DATABASE_VERSION.into(), + }) } /// Open a `CryptoStore` that uses the same database as this store. @@ -1632,10 +1730,10 @@ struct TimelineMetadata { mod tests { use matrix_sdk_base::statestore_integration_tests; - use super::{SledStore, StateStore, StoreResult}; + use super::{SledStoreBuilder, StateStore, StoreResult}; async fn get_store() -> StoreResult { - SledStore::open().map_err(Into::into) + SledStoreBuilder::default().build().map_err(Into::into) } statestore_integration_tests! { integration } @@ -1645,11 +1743,102 @@ mod tests { mod encrypted_tests { use matrix_sdk_base::statestore_integration_tests; - use super::{SledStore, StateStore, StoreResult}; + use super::{SledStoreBuilder, StateStore, StoreResult}; async fn get_store() -> StoreResult { - SledStore::open_encrypted().map_err(Into::into) + SledStoreBuilder::build_encrypted().map_err(Into::into) } statestore_integration_tests! { integration } } + +#[cfg(test)] +mod migration { + use super::{MigrationConflictStrategy, Result, SledStoreBuilder, SledStoreError}; + use matrix_sdk_test::async_test; + use tempfile::TempDir; + + #[async_test] + pub async fn migrating_v1_to_2_plain() -> Result<()> { + let folder = TempDir::new()?; + + let store = SledStoreBuilder::default().path(folder.path().to_path_buf()).build()?; + + store.set_db_version(1u8)?; + drop(store); + + // this transparently migrates to the latest version + let _store = SledStoreBuilder::default().path(folder.path().to_path_buf()).build()?; + Ok(()) + } + + #[async_test] + pub async fn migrating_v1_to_2_with_pw_backed_up() -> Result<()> { + let folder = TempDir::new()?; + + let store = SledStoreBuilder::default() + .path(folder.path().to_path_buf()) + .passphrase("something".to_owned()) + .build()?; + + store.set_db_version(1u8)?; + drop(store); + + // this transparently creates a backup and a fresh db + let _store = SledStoreBuilder::default() + .path(folder.path().to_path_buf()) + .passphrase("something".to_owned()) + .build()?; + assert_eq!(std::fs::read_dir(folder.path())?.count(), 2); + Ok(()) + } + + #[async_test] + pub async fn migrating_v1_to_2_with_pw_drop() -> Result<()> { + let folder = TempDir::new()?; + + let store = SledStoreBuilder::default() + .path(folder.path().to_path_buf()) + .passphrase("other thing".to_owned()) + .build()?; + + store.set_db_version(1u8)?; + drop(store); + + // this transparently creates a backup and a fresh db + let _store = SledStoreBuilder::default() + .path(folder.path().to_path_buf()) + .passphrase("other thing".to_owned()) + .migration_conflict_strategy(MigrationConflictStrategy::Drop) + .build()?; + assert_eq!(std::fs::read_dir(folder.path())?.count(), 1); + Ok(()) + } + + #[async_test] + pub async fn migrating_v1_to_2_with_pw_raises() -> Result<()> { + let folder = TempDir::new()?; + + let store = SledStoreBuilder::default() + .path(folder.path().to_path_buf()) + .passphrase("secret".to_owned()) + .build()?; + + store.set_db_version(1u8)?; + drop(store); + + // this transparently creates a backup and a fresh db + let res = SledStoreBuilder::default() + .path(folder.path().to_path_buf()) + .passphrase("secret".to_owned()) + .migration_conflict_strategy(MigrationConflictStrategy::Raise) + .build(); + if let Err(SledStoreError::MigrationConflict { .. }) = res { + // all good + } else { + panic!("Didn't raise the expected error: {:?}", res); + } + assert_eq!(std::fs::read_dir(folder.path())?.count(), 1); + Ok(()) + } +}