feat(sled): Introduce SledStoreBuilder, allow migration conflict strategy configuration

This commit is contained in:
Benjamin Kampmann
2022-07-06 16:06:47 +02:00
parent d465b70bea
commit dc40309cbe
3 changed files with 302 additions and 107 deletions

View File

@@ -27,6 +27,7 @@ experimental-timeline = ["matrix-sdk-base/experimental-timeline"]
async-stream = "0.3.3"
async-trait = "0.1.53"
dashmap = "5.2.0"
derive_builder = "0.11.2"
futures-core = "0.3.21"
futures-util = { version = "0.3.21", default-features = false }
matrix-sdk-base = { version = "0.5.0", path = "../matrix-sdk-base", optional = true }
@@ -40,6 +41,7 @@ sled = "0.34.7"
thiserror = "1.0.30"
tokio = { version = "1.17.0", default-features = false, features = ["sync", "fs"] }
tracing = "0.1.34"
fs_extra = "1.2.0"
[dev-dependencies]
matrix-sdk-base = { path = "../matrix-sdk-base", features = ["testing"] }
@@ -47,4 +49,5 @@ matrix-sdk-crypto = { path = "../matrix-sdk-crypto", features = ["testing"] }
matrix-sdk-test = { path = "../matrix-sdk-test" }
once_cell = "1.10.0"
tempfile = "3.3.0"
glob = "0.3.0"
tokio = { version = "1.17.0", default-features = false, features = ["rt-multi-thread", "macros"] }

View File

@@ -16,7 +16,7 @@ mod state_store;
#[cfg(feature = "crypto-store")]
pub use cryptostore::SledStore as CryptoStore;
#[cfg(feature = "state-store")]
pub use state_store::SledStore as StateStore;
pub use state_store::{SledStore as StateStore, SledStoreBuilder as SledStateStoreBuilder};
/// All the errors that can occur when opening a sled store.
#[derive(Error, Debug)]
@@ -61,11 +61,13 @@ pub fn make_store_config(
#[cfg(not(feature = "crypto-store"))]
{
let state_store = if let Some(passphrase) = passphrase {
StateStore::open_with_passphrase(path, passphrase)?
} else {
StateStore::open_with_path(path)?
let mut store_builder = SledStateStoreBuilder::default();
store_builder.path(path.as_ref().to_path_buf());
if let Some(passphrase) = passphrase {
store_builder.passphrase(passphrase.to_owned());
};
let state_store = store_builder.build().map_err(StoreError::backend)?;
Ok(StoreConfig::new().state_store(Box::new(state_store)))
}
@@ -78,13 +80,14 @@ fn open_stores_with_path(
path: impl AsRef<std::path::Path>,
passphrase: Option<&str>,
) -> Result<(Box<StateStore>, Box<CryptoStore>), OpenStoreError> {
let mut store_builder = SledStateStoreBuilder::default();
store_builder.path(path.as_ref().to_path_buf());
if let Some(passphrase) = passphrase {
let state_store = StateStore::open_with_passphrase(path, passphrase)?;
let crypto_store = state_store.open_crypto_store()?;
Ok((Box::new(state_store), Box::new(crypto_store)))
} else {
let state_store = StateStore::open_with_path(path)?;
let crypto_store = state_store.open_crypto_store()?;
Ok((Box::new(state_store), Box::new(crypto_store)))
}
store_builder.passphrase(passphrase.to_owned());
};
let state_store = store_builder.build().map_err(StoreError::backend)?;
let crypto_store = state_store.open_crypto_store()?;
Ok((Box::new(state_store), Box::new(crypto_store)))
}

View File

@@ -16,12 +16,13 @@ use std::{
collections::BTreeSet,
path::{Path, PathBuf},
sync::Arc,
time::Instant,
time::{Instant, SystemTime, UNIX_EPOCH},
};
#[cfg(feature = "experimental-timeline")]
use async_stream::stream;
use async_trait::async_trait;
use derive_builder::Builder;
use futures_core::stream::Stream;
use futures_util::stream::{self, StreamExt, TryStreamExt};
use matrix_sdk_base::{
@@ -82,6 +83,29 @@ pub enum SledStoreError {
Identifier(#[from] IdParseError),
#[error(transparent)]
Task(#[from] tokio::task::JoinError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
FsExtra(#[from] fs_extra::error::Error),
#[error("Can't migrate {path} from {old_version} to {new_version} without deleting data. See MigrationConflictStrategy for ways to configure.")]
MigrationConflict { path: PathBuf, old_version: usize, new_version: usize },
}
/// Sometimes Migrations can't procede without having to drop existing
/// data. This allows you to configure, how these cases should be handled.
#[allow(dead_code)]
#[derive(PartialEq, Clone, Debug)]
pub enum MigrationConflictStrategy {
/// Just drop the data, we don't care that we have to sync again
Drop,
/// Raise a `SledStoreError::MigrationConflict` error with the path to the
/// DB in question. The caller then has to take care about what they want
/// to do and try again after.
Raise,
/// _Default_: The _entire_ database is backed up under `$path.$timestamp.backup`
/// (this includes the crypto store if they are linked), before the state tables
/// are dropped.
BackupAndDrop,
}
impl From<TransactionError<SledStoreError>> for SledStoreError {
@@ -150,8 +174,129 @@ const TIMELINE_METADATA: &str = "timeline-metadata";
#[cfg(feature = "experimental-timeline")]
const TIMELINE: &str = "timeline";
const ALL_DB_STORES: &[&str] = &[
ACCOUNT_DATA,
SYNC_TOKEN,
DISPLAY_NAME,
INVITED_USER_ID,
JOINED_USER_ID,
MEDIA,
MEMBER,
PRESENCE,
PROFILE,
ROOM_ACCOUNT_DATA,
#[cfg(feature = "experimental-timeline")]
ROOM_EVENT_ID_POSITION,
ROOM_EVENT_RECEIPT,
ROOM_INFO,
ROOM_STATE,
ROOM_USER_RECEIPT,
ROOM,
STRIPPED_INVITED_USER_ID,
STRIPPED_JOINED_USER_ID,
STRIPPED_ROOM_INFO,
STRIPPED_ROOM_MEMBER,
STRIPPED_ROOM_STATE,
#[cfg(feature = "experimental-timeline")]
ROOM_EVENT_ID_POSITION,
#[cfg(feature = "experimental-timeline")]
TIMELINE_METADATA,
#[cfg(feature = "experimental-timeline")]
TIMELINE,
];
const ALL_GLOBAL_KEYS: &[&str] = &[VERSION_KEY];
type Result<A, E = SledStoreError> = std::result::Result<A, E>;
#[derive(Builder, Debug, PartialEq)]
#[builder(name = "SledStoreBuilder", build_fn(skip))]
pub struct SledStoreBuilderConfig {
path: PathBuf,
passphrase: String,
#[builder(default = "MigrationConflictStrategy::BackupAndDrop")]
migration_conflict_strategy: MigrationConflictStrategy,
}
impl SledStoreBuilder {
pub fn build(&mut self) -> Result<SledStore> {
let is_temp = self.path.is_none();
let mut cfg = Config::new().temporary(is_temp);
let path = if let Some(path) = &self.path {
let path = path.join("matrix-sdk-store");
cfg = cfg.path(&path);
Some(path)
} else {
None
};
let db = cfg.open().map_err(StoreError::backend)?;
let store_cipher = if let Some(passphrase) = &self.passphrase {
if let Some(inner) = db.get("store_cipher".encode())? {
Some(StoreCipher::import(passphrase, &inner)?.into())
} else {
let cipher = StoreCipher::new()?;
db.insert("store_cipher".encode(), cipher.export(passphrase)?)?;
Some(cipher.into())
}
} else {
None
};
let mut store = SledStore::open_helper(db, path, store_cipher)?;
let migration_res = store.upgrade();
if let Err(SledStoreError::MigrationConflict { path, .. }) = &migration_res {
// how are supposed to react about this?
match self
.migration_conflict_strategy
.as_ref()
.unwrap_or(&MigrationConflictStrategy::BackupAndDrop)
{
MigrationConflictStrategy::BackupAndDrop => {
let mut new_path = path.clone();
new_path.set_extension(format!(
"{}.backup",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time doesn't go backwards")
.as_secs()
));
fs_extra::dir::create_all(&new_path, false)?;
fs_extra::dir::copy(path, new_path, &fs_extra::dir::CopyOptions::new())?;
store.drop_tables()?;
return self.build();
}
MigrationConflictStrategy::Drop => {
store.drop_tables()?;
return self.build();
}
MigrationConflictStrategy::Raise => migration_res?,
}
} else {
migration_res?;
}
Ok(store)
}
// testing only
#[cfg(test)]
fn build_encrypted() -> StoreResult<SledStore> {
let db = Config::new().temporary(true).open().map_err(StoreError::backend)?;
SledStore::open_helper(
db,
None,
Some(StoreCipher::new().expect("can't create store cipher").into()),
)
.map_err(|e| e.into())
}
}
#[derive(Clone)]
pub struct SledStore {
path: Option<PathBuf>,
@@ -195,6 +340,7 @@ impl std::fmt::Debug for SledStore {
}
}
#[allow(deprecated)]
impl SledStore {
fn open_helper(
db: Db,
@@ -235,7 +381,7 @@ impl SledStore {
#[cfg(feature = "experimental-timeline")]
let room_event_id_to_position = db.open_tree(ROOM_EVENT_ID_POSITION)?;
let mut database = Self {
Ok(Self {
path,
inner: db,
store_cipher,
@@ -265,64 +411,47 @@ impl SledStore {
room_timeline_metadata,
#[cfg(feature = "experimental-timeline")]
room_event_id_to_position,
};
database.upgrade()?;
Ok(database)
})
}
#[deprecated(note = "Use SledStoreBuilder instead.")]
pub fn open() -> StoreResult<Self> {
let db = Config::new().temporary(true).open().map_err(StoreError::backend)?;
SledStore::open_helper(db, None, None).map_err(|e| e.into())
}
// testing only
#[cfg(test)]
fn open_encrypted() -> StoreResult<Self> {
let db = Config::new().temporary(true).open().map_err(StoreError::backend)?;
SledStore::open_helper(
db,
None,
Some(StoreCipher::new().expect("can't create store cipher").into()),
)
.map_err(|e| e.into())
SledStoreBuilder::default().build().map_err(StoreError::backend)
}
#[deprecated(note = "Use SledStoreBuilder instead.")]
pub fn open_with_passphrase(path: impl AsRef<Path>, passphrase: &str) -> StoreResult<Self> {
Self::inner_open_with_passphrase(path, passphrase).map_err(|e| e.into())
}
fn inner_open_with_passphrase(path: impl AsRef<Path>, passphrase: &str) -> Result<Self> {
let path = path.as_ref().join("matrix-sdk-state");
let db = Config::new().temporary(false).path(&path).open()?;
let store_cipher = if let Some(inner) = db.get("store_cipher".encode())? {
StoreCipher::import(passphrase, &inner)?
} else {
let cipher = StoreCipher::new()?;
db.insert("store_cipher".encode(), cipher.export(passphrase)?)?;
cipher
}
.into();
SledStore::open_helper(db, Some(path), Some(store_cipher))
SledStoreBuilder::default()
.path(path.as_ref().into())
.passphrase(passphrase.to_owned())
.build()
.map_err(StoreError::backend)
}
#[deprecated(note = "Use SledStoreBuilder instead.")]
pub fn open_with_path(path: impl AsRef<Path>) -> StoreResult<Self> {
Self::inner_open_with_path(path).map_err(|e| e.into())
SledStoreBuilder::default().path(path.as_ref().into()).build().map_err(StoreError::backend)
}
fn inner_open_with_path(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().join("matrix-sdk-state");
let db = Config::new().temporary(false).path(&path).open()?;
fn drop_tables(self) -> StoreResult<()> {
for name in ALL_DB_STORES {
self.inner.drop_tree(name).map_err(StoreError::backend)?;
}
for name in ALL_GLOBAL_KEYS {
self.inner.remove(name).map_err(StoreError::backend)?;
}
SledStore::open_helper(db, Some(path), None)
Ok(())
}
fn upgrade(&mut self) -> StoreResult<()> {
let db_version = self.inner.get(VERSION_KEY).map_err(StoreError::backend)?.map(|v| {
fn set_db_version(&self, version: u8) -> Result<()> {
self.inner.insert(VERSION_KEY, version.to_be_bytes().as_ref())?;
self.inner.flush()?;
Ok(())
}
fn upgrade(&mut self) -> Result<()> {
let db_version = self.inner.get(VERSION_KEY)?.map(|v| {
let (version_bytes, _) = v.split_at(std::mem::size_of::<u8>());
u8::from_be_bytes(version_bytes.try_into().unwrap_or_default())
});
@@ -330,11 +459,7 @@ impl SledStore {
let old_version = match db_version {
None => {
// we are fresh, let's write the current version
self.inner
.insert(VERSION_KEY, DATABASE_VERSION.to_be_bytes().as_ref())
.map_err(StoreError::backend)?;
self.inner.flush().map_err(StoreError::backend)?;
return Ok(());
return self.set_db_version(DATABASE_VERSION);
}
Some(version) if version == DATABASE_VERSION => {
// current, we don't have to do anything
@@ -352,52 +477,25 @@ impl SledStore {
if old_version == 1 {
if self.store_cipher.is_some() {
// we stored some fields un-encrypted. Drop them to enforce re-creation
let db = &self.inner;
db.drop_tree(JOINED_USER_ID).map_err(StoreError::backend)?;
db.drop_tree(INVITED_USER_ID).map_err(StoreError::backend)?;
db.drop_tree(STRIPPED_JOINED_USER_ID).map_err(StoreError::backend)?;
db.drop_tree(STRIPPED_INVITED_USER_ID).map_err(StoreError::backend)?;
db.drop_tree(CUSTOM).map_err(StoreError::backend)?;
db.drop_tree(MEDIA).map_err(StoreError::backend)?;
self.session.remove(SYNC_TOKEN.encode()).map_err(StoreError::backend)?;
self.stripped_joined_user_ids =
db.open_tree(STRIPPED_JOINED_USER_ID).map_err(StoreError::backend)?;
self.stripped_invited_user_ids =
db.open_tree(STRIPPED_INVITED_USER_ID).map_err(StoreError::backend)?;
self.joined_user_ids = db.open_tree(JOINED_USER_ID).map_err(StoreError::backend)?;
self.invited_user_ids =
db.open_tree(INVITED_USER_ID).map_err(StoreError::backend)?;
self.custom = db.open_tree(CUSTOM).map_err(StoreError::backend)?;
self.media = db.open_tree(MEDIA).map_err(StoreError::backend)?;
#[cfg(feature = "experimental-timeline")]
{
db.drop_tree(TIMELINE).map_err(StoreError::backend)?;
db.drop_tree(TIMELINE_METADATA).map_err(StoreError::backend)?;
db.drop_tree(ROOM_EVENT_ID_POSITION).map_err(StoreError::backend)?;
self.room_timeline = db.open_tree(TIMELINE).map_err(StoreError::backend)?;
self.room_timeline_metadata =
db.open_tree(TIMELINE_METADATA).map_err(StoreError::backend)?;
self.room_event_id_to_position =
db.open_tree(ROOM_EVENT_ID_POSITION).map_err(StoreError::backend)?;
}
return Err(SledStoreError::MigrationConflict {
path: self.path.take().expect("Path must exist for a migration to fail"),
old_version: old_version.into(),
new_version: DATABASE_VERSION.into(),
});
}
// successfully upgraded
self.inner
.insert(VERSION_KEY, DATABASE_VERSION.to_be_bytes().as_ref())
.map_err(StoreError::backend)?;
self.inner.flush().map_err(StoreError::backend)?;
// no migration to handle
self.set_db_version(2u8)?;
return Ok(());
}
// FUTURE UPGRADE CODE GOES HERE
// can't upgrade from that version to the new one
Err(StoreError::UnsupportedDatabaseVersion(old_version.into(), DATABASE_VERSION.into()))
Err(SledStoreError::MigrationConflict {
path: self.path.take().expect("Path must exist for a migration to fail"),
old_version: old_version.into(),
new_version: DATABASE_VERSION.into(),
})
}
/// Open a `CryptoStore` that uses the same database as this store.
@@ -1632,10 +1730,10 @@ struct TimelineMetadata {
mod tests {
use matrix_sdk_base::statestore_integration_tests;
use super::{SledStore, StateStore, StoreResult};
use super::{SledStoreBuilder, StateStore, StoreResult};
async fn get_store() -> StoreResult<impl StateStore> {
SledStore::open().map_err(Into::into)
SledStoreBuilder::default().build().map_err(Into::into)
}
statestore_integration_tests! { integration }
@@ -1645,11 +1743,102 @@ mod tests {
mod encrypted_tests {
use matrix_sdk_base::statestore_integration_tests;
use super::{SledStore, StateStore, StoreResult};
use super::{SledStoreBuilder, StateStore, StoreResult};
async fn get_store() -> StoreResult<impl StateStore> {
SledStore::open_encrypted().map_err(Into::into)
SledStoreBuilder::build_encrypted().map_err(Into::into)
}
statestore_integration_tests! { integration }
}
#[cfg(test)]
mod migration {
use super::{MigrationConflictStrategy, Result, SledStoreBuilder, SledStoreError};
use matrix_sdk_test::async_test;
use tempfile::TempDir;
#[async_test]
pub async fn migrating_v1_to_2_plain() -> Result<()> {
let folder = TempDir::new()?;
let store = SledStoreBuilder::default().path(folder.path().to_path_buf()).build()?;
store.set_db_version(1u8)?;
drop(store);
// this transparently migrates to the latest version
let _store = SledStoreBuilder::default().path(folder.path().to_path_buf()).build()?;
Ok(())
}
#[async_test]
pub async fn migrating_v1_to_2_with_pw_backed_up() -> Result<()> {
let folder = TempDir::new()?;
let store = SledStoreBuilder::default()
.path(folder.path().to_path_buf())
.passphrase("something".to_owned())
.build()?;
store.set_db_version(1u8)?;
drop(store);
// this transparently creates a backup and a fresh db
let _store = SledStoreBuilder::default()
.path(folder.path().to_path_buf())
.passphrase("something".to_owned())
.build()?;
assert_eq!(std::fs::read_dir(folder.path())?.count(), 2);
Ok(())
}
#[async_test]
pub async fn migrating_v1_to_2_with_pw_drop() -> Result<()> {
let folder = TempDir::new()?;
let store = SledStoreBuilder::default()
.path(folder.path().to_path_buf())
.passphrase("other thing".to_owned())
.build()?;
store.set_db_version(1u8)?;
drop(store);
// this transparently creates a backup and a fresh db
let _store = SledStoreBuilder::default()
.path(folder.path().to_path_buf())
.passphrase("other thing".to_owned())
.migration_conflict_strategy(MigrationConflictStrategy::Drop)
.build()?;
assert_eq!(std::fs::read_dir(folder.path())?.count(), 1);
Ok(())
}
#[async_test]
pub async fn migrating_v1_to_2_with_pw_raises() -> Result<()> {
let folder = TempDir::new()?;
let store = SledStoreBuilder::default()
.path(folder.path().to_path_buf())
.passphrase("secret".to_owned())
.build()?;
store.set_db_version(1u8)?;
drop(store);
// this transparently creates a backup and a fresh db
let res = SledStoreBuilder::default()
.path(folder.path().to_path_buf())
.passphrase("secret".to_owned())
.migration_conflict_strategy(MigrationConflictStrategy::Raise)
.build();
if let Err(SledStoreError::MigrationConflict { .. }) = res {
// all good
} else {
panic!("Didn't raise the expected error: {:?}", res);
}
assert_eq!(std::fs::read_dir(folder.path())?.count(), 1);
Ok(())
}
}