From 3e2bc3a514f4bb4c8b5de353e10c7119e3ee5662 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Thu, 1 Jun 2023 11:21:55 +0200 Subject: [PATCH] Drop matrix-sdk-sled --- Cargo.lock | 95 +- benchmarks/Cargo.toml | 1 - benchmarks/benches/crypto_bench.rs | 82 - benchmarks/benches/store_bench.rs | 31 - bindings/matrix-sdk-crypto-ffi/src/machine.rs | 4 +- bindings/matrix-sdk-crypto-nodejs/Cargo.toml | 1 - .../matrix-sdk-crypto-nodejs/src/machine.rs | 20 +- .../tests/machine.test.js | 4 +- codecov.yaml | 1 - crates/matrix-sdk-sled/Cargo.toml | 54 - crates/matrix-sdk-sled/README.md | 25 - crates/matrix-sdk-sled/build.rs | 17 - crates/matrix-sdk-sled/src/crypto_store.rs | 1151 ------------- crates/matrix-sdk-sled/src/encode_key.rs | 290 ---- crates/matrix-sdk-sled/src/lib.rs | 92 -- .../src/state_store/migrations.rs | 825 ---------- crates/matrix-sdk-sled/src/state_store/mod.rs | 1434 ----------------- crates/matrix-sdk/CHANGELOG.md | 2 +- 18 files changed, 9 insertions(+), 4120 deletions(-) delete mode 100644 crates/matrix-sdk-sled/Cargo.toml delete mode 100644 crates/matrix-sdk-sled/README.md delete mode 100644 crates/matrix-sdk-sled/build.rs delete mode 100644 crates/matrix-sdk-sled/src/crypto_store.rs delete mode 100644 crates/matrix-sdk-sled/src/encode_key.rs delete mode 100644 crates/matrix-sdk-sled/src/lib.rs delete mode 100644 crates/matrix-sdk-sled/src/state_store/migrations.rs delete mode 100644 crates/matrix-sdk-sled/src/state_store/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 02e6efa3d..01c98295c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -538,7 +538,6 @@ dependencies = [ "matrix-sdk", "matrix-sdk-base", "matrix-sdk-crypto", - "matrix-sdk-sled", "matrix-sdk-sqlite", "matrix-sdk-test", "pprof", @@ -1167,7 +1166,7 @@ dependencies = [ "hashbrown 0.12.3", "lock_api", "once_cell", - "parking_lot_core 0.9.7", + "parking_lot_core", ] [[package]] @@ -1634,16 +1633,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0845fa252299212f0389d64ba26f34fa32cfe41588355f21ed507c59a0f64541" -[[package]] -name = "fs2" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "fs_extra" version = "1.3.0" @@ -1770,15 +1759,6 @@ dependencies = [ "slab", ] -[[package]] -name = "fxhash" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" -dependencies = [ - "byteorder", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -2778,7 +2758,6 @@ dependencies = [ "http", "matrix-sdk-common", "matrix-sdk-crypto", - "matrix-sdk-sled", "matrix-sdk-sqlite", "napi", "napi-build", @@ -2882,31 +2861,6 @@ dependencies = [ "vodozemac", ] -[[package]] -name = "matrix-sdk-sled" -version = "0.2.0" -dependencies = [ - "assert_matches", - "async-trait", - "fs_extra", - "futures-core", - "futures-util", - "glob", - "matrix-sdk-base", - "matrix-sdk-crypto", - "matrix-sdk-store-encryption", - "matrix-sdk-test", - "once_cell", - "ruma", - "serde", - "serde_json", - "sled", - "tempfile", - "thiserror", - "tokio", - "tracing", -] - [[package]] name = "matrix-sdk-sqlite" version = "0.1.0" @@ -3513,17 +3467,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.6", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -3531,21 +3474,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.7", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall 0.2.16", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -3835,7 +3764,7 @@ dependencies = [ "log", "nix", "once_cell", - "parking_lot 0.12.1", + "parking_lot", "smallvec", "symbolic-demangle", "tempfile", @@ -4791,22 +4720,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "sled" -version = "0.34.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" -dependencies = [ - "crc32fast", - "crossbeam-epoch", - "crossbeam-utils", - "fs2", - "fxhash", - "libc", - "log", - "parking_lot 0.11.2", -] - [[package]] name = "sliding-sync-integration-test" version = "0.1.0" @@ -4879,7 +4792,7 @@ checksum = "f91138e76242f575eb1d3b38b4f1362f10d3a43f47d182a5b359af488a02293b" dependencies = [ "new_debug_unreachable", "once_cell", - "parking_lot 0.12.1", + "parking_lot", "phf_shared 0.10.0", "precomputed-hash", "serde", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index a39272db6..055d02a3f 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -12,7 +12,6 @@ criterion = { version = "0.4.0", features = ["async", "async_tokio", "html_repor matrix-sdk-base = { path = "../crates/matrix-sdk-base" } matrix-sdk-crypto = { path = "../crates/matrix-sdk-crypto", version = "0.6.0"} matrix-sdk-sqlite = { path = "../crates/matrix-sdk-sqlite", version = "0.1.0", default-features = false, features = ["crypto-store"] } -matrix-sdk-sled = { path = "../crates/matrix-sdk-sled", version = "0.2.0", features = ["crypto-store"] } matrix-sdk-test = { path = "../testing/matrix-sdk-test", version = "0.6.0"} matrix-sdk = { path = "../crates/matrix-sdk" } ruma = { workspace = true } diff --git a/benchmarks/benches/crypto_bench.rs b/benchmarks/benches/crypto_bench.rs index 9e668ac74..88fb74b25 100644 --- a/benchmarks/benches/crypto_bench.rs +++ b/benchmarks/benches/crypto_bench.rs @@ -2,7 +2,6 @@ use std::{ops::Deref, sync::Arc}; use criterion::*; use matrix_sdk_crypto::{EncryptionSettings, OlmMachine}; -use matrix_sdk_sled::SledCryptoStore; use matrix_sdk_sqlite::SqliteCryptoStore; use matrix_sdk_test::response_from_file; use ruma::{ @@ -90,18 +89,6 @@ pub fn keys_query(c: &mut Criterion) { drop(machine); } - // Benchmark (deprecated) sled store. - - let dir = tempfile::tempdir().unwrap(); - let store = Arc::new(runtime.block_on(SledCryptoStore::open(dir.path(), None)).unwrap()); - let machine = - runtime.block_on(OlmMachine::with_store(alice_id(), alice_device_id(), store)).unwrap(); - - group.bench_with_input(BenchmarkId::new("sled store", &name), &response, |b, response| { - b.to_async(&runtime) - .iter(|| async { machine.mark_request_as_sent(&txn_id, response).await.unwrap() }) - }); - group.finish() } @@ -164,28 +151,6 @@ pub fn keys_claiming(c: &mut Criterion) { ) }); - group.bench_with_input(BenchmarkId::new("sled store", &name), &response, |b, response| { - b.iter_batched( - || { - let dir = tempfile::tempdir().unwrap(); - let store = - Arc::new(runtime.block_on(SledCryptoStore::open(dir.path(), None)).unwrap()); - - let machine = runtime - .block_on(OlmMachine::with_store(alice_id(), alice_device_id(), store)) - .unwrap(); - runtime - .block_on(machine.mark_request_as_sent(&txn_id, &keys_query_response)) - .unwrap(); - (machine, &runtime, &txn_id) - }, - move |(machine, runtime, txn_id)| { - runtime.block_on(machine.mark_request_as_sent(txn_id, response)).unwrap() - }, - BatchSize::SmallInput, - ) - }); - group.finish() } @@ -269,37 +234,6 @@ pub fn room_key_sharing(c: &mut Criterion) { drop(machine); } - // Benchmark (deprecated) sled store. - - let dir = tempfile::tempdir().unwrap(); - let store = Arc::new(runtime.block_on(SledCryptoStore::open(dir.path(), None)).unwrap()); - - let machine = - runtime.block_on(OlmMachine::with_store(alice_id(), alice_device_id(), store)).unwrap(); - runtime.block_on(machine.mark_request_as_sent(&txn_id, &keys_query_response)).unwrap(); - runtime.block_on(machine.mark_request_as_sent(&txn_id, &response)).unwrap(); - - group.bench_function(BenchmarkId::new("sled store", &name), |b| { - b.to_async(&runtime).iter(|| async { - let requests = machine - .share_room_key( - room_id, - users.iter().map(Deref::deref), - EncryptionSettings::default(), - ) - .await - .unwrap(); - - assert!(!requests.is_empty()); - - for request in requests { - machine.mark_request_as_sent(&request.txn_id, &to_device_response).await.unwrap(); - } - - machine.invalidate_group_session(room_id).await.unwrap(); - }) - }); - group.finish() } @@ -349,22 +283,6 @@ pub fn devices_missing_sessions_collecting(c: &mut Criterion) { drop(machine); } - // Benchmark (deprecated) sled store. - - let dir = tempfile::tempdir().unwrap(); - let store = Arc::new(runtime.block_on(SledCryptoStore::open(dir.path(), None)).unwrap()); - - let machine = - runtime.block_on(OlmMachine::with_store(alice_id(), alice_device_id(), store)).unwrap(); - - runtime.block_on(machine.mark_request_as_sent(&txn_id, &response)).unwrap(); - - group.bench_function(BenchmarkId::new("sled store", &name), |b| { - b.to_async(&runtime).iter(|| async { - machine.get_missing_sessions(users.iter().map(Deref::deref)).await.unwrap() - }) - }); - group.finish() } diff --git a/benchmarks/benches/store_bench.rs b/benchmarks/benches/store_bench.rs index 7bced77ed..e05aa7103 100644 --- a/benchmarks/benches/store_bench.rs +++ b/benchmarks/benches/store_bench.rs @@ -1,7 +1,6 @@ use criterion::*; use matrix_sdk::{config::StoreConfig, Client, RoomInfo, RoomState, Session, StateChanges}; use matrix_sdk_base::{store::MemoryStore, StateStore as _}; -use matrix_sdk_sled::SledStateStore; use matrix_sdk_sqlite::SqliteStateStore; use ruma::{device_id, user_id, RoomId}; use tokio::runtime::Builder; @@ -74,36 +73,6 @@ pub fn restore_session(c: &mut Criterion) { for encryption_password in [None, Some("hunter2")] { let encrypted_suffix = if encryption_password.is_some() { "encrypted" } else { "clear" }; - // Sled - let sled_path = tempfile::tempdir().unwrap().path().to_path_buf(); - let mut sled_store_builder = SledStateStore::builder().path(sled_path); - if let Some(password) = encryption_password { - sled_store_builder = sled_store_builder.passphrase(password.to_owned()); - } - let sled_store = sled_store_builder.build().expect("Can't create sled store"); - runtime - .block_on(sled_store.save_changes(&changes)) - .expect("initial filling of sled failed"); - - group.bench_with_input( - BenchmarkId::new(format!("sled store {encrypted_suffix}"), NAME), - &sled_store, - |b, store| { - b.to_async(&runtime).iter(|| async { - let client = Client::builder() - .homeserver_url("https://matrix.example.com") - .store_config(StoreConfig::new().state_store(store.clone())) - .build() - .await - .expect("Can't build client"); - client - .restore_session(session.clone()) - .await - .expect("couldn't restore session"); - }) - }, - ); - // Sqlite let sqlite_dir = tempfile::tempdir().unwrap(); let sqlite_store = runtime diff --git a/bindings/matrix-sdk-crypto-ffi/src/machine.rs b/bindings/matrix-sdk-crypto-ffi/src/machine.rs index f900e49d1..5ab3af7b1 100644 --- a/bindings/matrix-sdk-crypto-ffi/src/machine.rs +++ b/bindings/matrix-sdk-crypto-ffi/src/machine.rs @@ -142,8 +142,8 @@ impl OlmMachine { /// * `path` - The path where the state of the machine should be persisted. /// /// * `passphrase` - The passphrase that should be used to encrypt the data - /// at rest in the Sled store. **Warning**, if no passphrase is given, the - /// store and all its data will remain unencrypted. + /// at rest in the crypto store. **Warning**, if no passphrase is given, + /// the store and all its data will remain unencrypted. #[uniffi::constructor] pub fn new( user_id: String, diff --git a/bindings/matrix-sdk-crypto-nodejs/Cargo.toml b/bindings/matrix-sdk-crypto-nodejs/Cargo.toml index 8bed841f2..cfd5c7684 100644 --- a/bindings/matrix-sdk-crypto-nodejs/Cargo.toml +++ b/bindings/matrix-sdk-crypto-nodejs/Cargo.toml @@ -25,7 +25,6 @@ tracing = ["dep:tracing-subscriber"] [dependencies] matrix-sdk-common = { version = "0.6.0", path = "../../crates/matrix-sdk-common", features = ["js"] } -matrix-sdk-sled = { version = "0.2.0", path = "../../crates/matrix-sdk-sled", default-features = false, features = ["crypto-store"] } matrix-sdk-sqlite = { version = "0.1.0", path = "../../crates/matrix-sdk-sqlite", features = ["crypto-store"] } ruma = { workspace = true, features = ["rand"] } napi = { version = "2.9.1", default-features = false, features = ["napi6", "tokio_rt"] } diff --git a/bindings/matrix-sdk-crypto-nodejs/src/machine.rs b/bindings/matrix-sdk-crypto-nodejs/src/machine.rs index a525ec3fb..b458d76cf 100644 --- a/bindings/matrix-sdk-crypto-nodejs/src/machine.rs +++ b/bindings/matrix-sdk-crypto-nodejs/src/machine.rs @@ -59,11 +59,8 @@ impl Deref for OlmMachineInner { #[derive(Default)] #[napi] pub enum StoreType { - /// Use `matrix-sdk-sled`. - #[default] - Sled, - /// Use `matrix-sdk-sqlite`. + #[default] Sqlite, } @@ -123,21 +120,6 @@ impl OlmMachine { inner: OlmMachineInner::Opened(ManuallyDrop::new(match store_path { Some(store_path) => { let machine = match store_type.unwrap_or_default() { - StoreType::Sled => { - matrix_sdk_crypto::OlmMachine::with_store( - user_id, - device_id, - matrix_sdk_sled::SledCryptoStore::open( - store_path, - store_passphrase.as_deref(), - ) - .await - .map(Arc::new) - .map_err(into_err)?, - ) - .await - } - StoreType::Sqlite => { matrix_sdk_crypto::OlmMachine::with_store( user_id, diff --git a/bindings/matrix-sdk-crypto-nodejs/tests/machine.test.js b/bindings/matrix-sdk-crypto-nodejs/tests/machine.test.js index 0e4ed8929..cfe4758f6 100644 --- a/bindings/matrix-sdk-crypto-nodejs/tests/machine.test.js +++ b/bindings/matrix-sdk-crypto-nodejs/tests/machine.test.js @@ -25,8 +25,7 @@ const fs = require("fs/promises"); describe("StoreType", () => { test("has the correct variant values", () => { - expect(StoreType.Sled).toStrictEqual(0); - expect(StoreType.Sqlite).toStrictEqual(1); + expect(StoreType.Sqlite).toStrictEqual(0); }); }); @@ -53,7 +52,6 @@ describe(OlmMachine.name, () => { describe("can be instantiated with a store", () => { for (const [store_type, store_name] of [ - [StoreType.Sled, "sled"], [StoreType.Sqlite, "sqlite"], [null, "default"], ]) { diff --git a/codecov.yaml b/codecov.yaml index 7f6905c01..1208a1e27 100644 --- a/codecov.yaml +++ b/codecov.yaml @@ -13,7 +13,6 @@ coverage: - "crates/matrix-sdk-common/" - "crates/matrix-sdk-crypto/" - "crates/matrix-sdk-qrcode/" - - "crates/matrix-sdk-sled/" - "crates/matrix-sdk-sqlite/" - "crates/matrix-sdk-store-encryption/" # Coverage of wasm tests isn't supported at the moment, diff --git a/crates/matrix-sdk-sled/Cargo.toml b/crates/matrix-sdk-sled/Cargo.toml deleted file mode 100644 index 5fb9e1030..000000000 --- a/crates/matrix-sdk-sled/Cargo.toml +++ /dev/null @@ -1,54 +0,0 @@ -[package] -name = "matrix-sdk-sled" -version = "0.2.0" -edition = "2021" -authors = ["Damir Jelić "] -repository = "https://github.com/matrix-org/matrix-rust-sdk" -description = "Sled Storage backend for matrix-sdk for native environments" -license = "Apache-2.0" -rust-version = { workspace = true } -readme = "README.md" - -[package.metadata.docs.rs] -all-features = true -rustdoc-args = ["--cfg", "docsrs"] - -[features] -default = ["state-store"] - -state-store = ["dep:matrix-sdk-base"] -crypto-store = [ - "dep:matrix-sdk-base", - "dep:matrix-sdk-crypto", - "matrix-sdk-base?/e2e-encryption", -] - -docsrs = [ - "crypto-store", -] - -[dependencies] -async-trait = { workspace = true } -fs_extra = "1.2.0" -futures-core = { workspace = true } -futures-util = { workspace = true } -matrix-sdk-base = { version = "0.6.0", path = "../matrix-sdk-base", optional = true } -matrix-sdk-crypto = { version = "0.6.0", path = "../matrix-sdk-crypto", optional = true } -matrix-sdk-store-encryption = { version = "0.2.0", path = "../matrix-sdk-store-encryption" } -ruma = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -sled = "0.34.7" -thiserror = { workspace = true } -tokio = { workspace = true, features = ["fs"] } -tracing = { workspace = true } - -[dev-dependencies] -assert_matches = { workspace = true } -glob = "0.3.0" -matrix-sdk-base = { path = "../matrix-sdk-base", features = ["testing"] } -matrix-sdk-crypto = { path = "../matrix-sdk-crypto", features = ["testing"] } -matrix-sdk-test = { path = "../../testing/matrix-sdk-test" } -once_cell = { workspace = true } -tempfile = "3.3.0" -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/crates/matrix-sdk-sled/README.md b/crates/matrix-sdk-sled/README.md deleted file mode 100644 index 2669205bd..000000000 --- a/crates/matrix-sdk-sled/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# matrix-sdk-sled - -This crate implements a storage backend using [sled][sled] for native and mobile environments using the matrix-sdk-base primitives. When using **matrix-sdk** this is included by default. - -_Note_: the future of [sled][sled] is unclear. While it is currently the default for mobile and native environments for matrix-rust-sdk, [we are actively looking at replacing it with a different storage backend](https://github.com/matrix-org/matrix-rust-sdk/issues/294). - - -## Crate Feature Flags - -The following crate feature flags are available: - -* `state-store`: (on by default) Enables the state store -* `crypto-store`: Enables the store for end-to-end encrypted data. - - -## Minimum Supported Rust Version (MSRV) - -These crates are built with the Rust language version 2021 and require a minimum compiler version of `1.62`. - -## License - -[Apache-2.0](https://www.apache.org/licenses/LICENSE-2.0) - - -[sled]: https://sled.rs/ diff --git a/crates/matrix-sdk-sled/build.rs b/crates/matrix-sdk-sled/build.rs deleted file mode 100644 index ea5f5a045..000000000 --- a/crates/matrix-sdk-sled/build.rs +++ /dev/null @@ -1,17 +0,0 @@ -use std::{env, process}; - -fn main() { - let target_arch = env::var_os("CARGO_CFG_TARGET_ARCH"); - if target_arch.map_or(false, |arch| arch == "wasm32") { - let err = "this crate does not support the target arch 'wasm32'"; - eprintln!( - "\n\ - ┏━━━━━━━━{pad}━┓\n\ - ┃ error: {err} ┃\n\ - ┗━━━━━━━━{pad}━┛\n\ - ", - pad = "━".repeat(err.len()), - ); - process::exit(1); - } -} diff --git a/crates/matrix-sdk-sled/src/crypto_store.rs b/crates/matrix-sdk-sled/src/crypto_store.rs deleted file mode 100644 index 9c083505f..000000000 --- a/crates/matrix-sdk-sled/src/crypto_store.rs +++ /dev/null @@ -1,1151 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{ - borrow::Cow, - collections::HashMap, - path::{Path, PathBuf}, - sync::{Arc, RwLock}, -}; - -use async_trait::async_trait; -use matrix_sdk_crypto::{ - olm::{ - IdentityKeys, InboundGroupSession, OutboundGroupSession, PickledInboundGroupSession, - PrivateCrossSigningIdentity, Session, - }, - store::{ - caches::SessionStore, BackupKeys, Changes, CryptoStore, CryptoStoreError, Result, - RoomKeyCounts, RoomSettings, - }, - types::{ - events::{room_key_request::SupportedKeyInfo, room_key_withheld::RoomKeyWithheldEvent}, - EventEncryptionAlgorithm, - }, - GossipRequest, ReadOnlyAccount, ReadOnlyDevice, ReadOnlyUserIdentities, SecretInfo, - TrackedUser, -}; -use matrix_sdk_store_encryption::StoreCipher; -use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, RoomId, TransactionId, UserId}; -use serde::{de::DeserializeOwned, Serialize}; -use sled::{ - transaction::{ConflictableTransactionError, TransactionError}, - Batch, Config, Db, IVec, Transactional, Tree, -}; -use tokio::sync::Mutex; -use tracing::debug; - -use super::OpenStoreError; -use crate::encode_key::{EncodeKey, ENCODE_SEPARATOR}; - -const DATABASE_VERSION: u8 = 7; - -// Table names that are used to derive a separate key for each tree. This ensure -// that user ids encoded for different trees won't end up as the same byte -// sequence. This prevents corelation attacks on our tree metadata. -const DEVICE_TABLE_NAME: &str = "crypto-store-devices"; -const IDENTITIES_TABLE_NAME: &str = "crypto-store-identities"; -const SESSIONS_TABLE_NAME: &str = "crypto-store-sessions"; -const INBOUND_GROUP_TABLE_NAME: &str = "crypto-store-inbound-group-sessions"; -const OUTBOUND_GROUP_TABLE_NAME: &str = "crypto-store-outbound-group-sessions"; -const SECRET_REQUEST_BY_INFO_TABLE: &str = "crypto-store-secret-request-by-info"; -const TRACKED_USERS_TABLE: &str = "crypto-store-secret-tracked-users"; -const DIRECT_WITHHELD_INFO_TABLE: &str = "crypto-store-direct-withheld-info"; -const NO_OLM_SENT_TABLE: &str = "crypto-store-no-olm-sent"; -const ROOM_SETTINGS_TABLE: &str = "crypto-store-secret-room-settings"; - -impl EncodeKey for InboundGroupSession { - fn encode(&self) -> Vec { - (self.room_id(), self.session_id()).encode() - } - - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - (self.room_id(), self.session_id()).encode_secure(table_name, store_cipher) - } -} - -impl EncodeKey for OutboundGroupSession { - fn encode(&self) -> Vec { - self.room_id().encode() - } - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - self.room_id().encode_secure(table_name, store_cipher) - } -} - -impl EncodeKey for Session { - fn encode(&self) -> Vec { - let sender_key = self.sender_key().to_base64(); - let session_id = self.session_id(); - - [sender_key.as_bytes(), &[ENCODE_SEPARATOR], session_id.as_bytes(), &[ENCODE_SEPARATOR]] - .concat() - } - - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - let sender_key = - store_cipher.hash_key(table_name, self.sender_key().to_base64().as_bytes()); - let session_id = store_cipher.hash_key(table_name, self.session_id().as_bytes()); - - [sender_key.as_slice(), &[ENCODE_SEPARATOR], session_id.as_slice(), &[ENCODE_SEPARATOR]] - .concat() - } -} - -impl EncodeKey for SecretInfo { - fn encode(&self) -> Vec { - match self { - SecretInfo::KeyRequest(k) => k.encode(), - SecretInfo::SecretRequest(s) => s.encode(), - } - } - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - match self { - SecretInfo::KeyRequest(k) => k.encode_secure(table_name, store_cipher), - SecretInfo::SecretRequest(s) => s.encode_secure(table_name, store_cipher), - } - } -} - -impl EncodeKey for EventEncryptionAlgorithm { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - let s: &str = self.as_ref(); - s.as_bytes().into() - } -} - -impl EncodeKey for SupportedKeyInfo { - fn encode(&self) -> Vec { - (self.room_id(), &self.algorithm(), self.session_id()).encode() - } - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - let room_id = store_cipher.hash_key(table_name, self.room_id().as_bytes()); - let algorithm = store_cipher.hash_key(table_name, self.algorithm().as_ref().as_bytes()); - let session_id = store_cipher.hash_key(table_name, self.session_id().as_bytes()); - - [ - room_id.as_slice(), - &[ENCODE_SEPARATOR], - algorithm.as_slice(), - &[ENCODE_SEPARATOR], - session_id.as_slice(), - &[ENCODE_SEPARATOR], - ] - .concat() - } -} - -impl EncodeKey for ReadOnlyDevice { - fn encode(&self) -> Vec { - (self.user_id(), self.device_id()).encode() - } - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - (self.user_id(), self.device_id()).encode_secure(table_name, store_cipher) - } -} - -#[derive(Clone, Debug)] -pub struct AccountInfo { - user_id: OwnedUserId, - device_id: OwnedDeviceId, - identity_keys: Arc, -} - -/// A [sled] based cryptostore. -/// -/// [sled]: https://github.com/spacejam/sled#readme -#[derive(Clone)] -pub struct SledCryptoStore { - account_info: Arc>>, - - store_cipher: Option>, - path: Option, - inner: Db, - - session_cache: SessionStore, - - account: Tree, - private_identity: Tree, - - olm_hashes: Tree, - sessions: Tree, - inbound_group_sessions: Tree, - outbound_group_sessions: Tree, - - outgoing_secret_requests: Tree, - unsent_secret_requests: Tree, - secret_requests_by_info: Tree, - - devices: Tree, - identities: Tree, - - tracked_users: Tree, - - direct_withheld_info: Tree, - no_olm_sent: Tree, - room_settings: Tree, -} - -impl std::fmt::Debug for SledCryptoStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(path) = &self.path { - f.debug_struct("SledCryptoStore").field("path", &path).finish() - } else { - f.debug_struct("SledCryptoStore").field("path", &"memory store").finish() - } - } -} - -impl SledCryptoStore { - /// Open the sled-based crypto store at the given path using the given - /// passphrase to encrypt private data. - pub async fn open( - path: impl AsRef, - passphrase: Option<&str>, - ) -> Result { - let path = path.as_ref().join("matrix-sdk-crypto"); - let db = - Config::new().temporary(false).path(&path).open().map_err(CryptoStoreError::backend)?; - - Self::open_with_database(db, passphrase).await - } - - /// Create a sled-based crypto store using the given sled database. - /// The given passphrase will be used to encrypt private data. - pub async fn open_with_database( - db: Db, - passphrase: Option<&str>, - ) -> Result { - let store_cipher = passphrase - .map(|p| Self::get_or_create_store_cipher(p, &db)) - .transpose()? - .map(Into::into); - - SledCryptoStore::open_helper(db, None, store_cipher).await - } - - fn get_account_info(&self) -> Option { - self.account_info.read().unwrap().clone() - } - - fn serialize_value(&self, event: &impl Serialize) -> Result, CryptoStoreError> { - if let Some(key) = &self.store_cipher { - key.encrypt_value(event).map_err(CryptoStoreError::backend) - } else { - Ok(serde_json::to_vec(event)?) - } - } - - fn deserialize_value(&self, event: &[u8]) -> Result { - if let Some(key) = &self.store_cipher { - key.decrypt_value(event).map_err(CryptoStoreError::backend) - } else { - Ok(serde_json::from_slice(event)?) - } - } - - fn encode_key(&self, table_name: &str, key: T) -> Vec { - if let Some(store_cipher) = &self.store_cipher { - key.encode_secure(table_name, store_cipher).to_vec() - } else { - key.encode() - } - } - - async fn reset_backup_state(&self) -> Result<()> { - let mut pickles: Vec<(IVec, PickledInboundGroupSession)> = self - .inbound_group_sessions - .iter() - .map(|p| { - let item = p.map_err(CryptoStoreError::backend)?; - Ok((item.0, self.deserialize_value(&item.1)?)) - }) - .collect::>()?; - - for (_, pickle) in &mut pickles { - pickle.backed_up = false; - } - - let ret: Result<(), TransactionError> = - self.inbound_group_sessions.transaction(|inbound_sessions| { - for (key, pickle) in &pickles { - inbound_sessions.insert( - key, - self.serialize_value(pickle) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - - Ok(()) - }); - - ret.map_err(CryptoStoreError::backend)?; - - self.inner.flush_async().await.map_err(CryptoStoreError::backend)?; - - Ok(()) - } - - async fn upgrade(&self) -> Result<()> { - let version = self - .inner - .get("store_version") - .map_err(CryptoStoreError::backend)? - .map(|v| { - let (version_bytes, _) = v.split_at(std::mem::size_of::()); - u8::from_be_bytes(version_bytes.try_into().unwrap_or_default()) - }) - .unwrap_or(DATABASE_VERSION); - - if version != DATABASE_VERSION { - debug!(version, new_version = DATABASE_VERSION, "Upgrading the Sled crypto store"); - } - - if version <= 3 { - return Err(CryptoStoreError::UnsupportedDatabaseVersion( - version.into(), - DATABASE_VERSION.into(), - )); - } - - if version <= 4 { - // Room key requests are not that important, if they are needed they - // will be sent out again. So let's drop all of them since we - // removed the `sender_key` from the hash key. - self.outgoing_secret_requests.clear().map_err(CryptoStoreError::backend)?; - self.unsent_secret_requests.clear().map_err(CryptoStoreError::backend)?; - self.secret_requests_by_info.clear().map_err(CryptoStoreError::backend)?; - } - - if version <= 5 { - // We changed how we store inbound group sessions, the key used to - // be a trippled of `(room_id, sender_key, session_id)` now it's a - // tuple of `(room_id, session_id)` - // - // This method gets all the sessions and doesn't touch the key. - let inbound_group_sessions = self.get_inbound_group_sessions().await?; - - // Clear the tree that stores the inbound group sessions. - self.inbound_group_sessions.clear().map_err(CryptoStoreError::backend)?; - - // Save all the sessions now, this will store them using the new - // tuple as the key. - let changes = Changes { inbound_group_sessions, ..Default::default() }; - self.save_changes(changes).await?; - } - - if version < 7 { - // we have changed the way outbound group session are stored - // just clear - self.outbound_group_sessions.clear().map_err(CryptoStoreError::backend)?; - } - - self.inner - .insert("store_version", DATABASE_VERSION.to_be_bytes().as_ref()) - .map_err(CryptoStoreError::backend)?; - self.inner.flush().map_err(CryptoStoreError::backend)?; - - Ok(()) - } - - fn get_or_create_store_cipher(passphrase: &str, database: &Db) -> Result { - let cipher = if let Some(key) = - database.get("store_cipher".encode()).map_err(CryptoStoreError::backend)? - { - StoreCipher::import(passphrase, &key).map_err(|_| CryptoStoreError::UnpicklingError)? - } else { - let cipher = StoreCipher::new().map_err(CryptoStoreError::backend)?; - #[cfg(not(test))] - let export = cipher.export(passphrase); - #[cfg(test)] - let export = cipher._insecure_export_fast_for_testing(passphrase); - database - .insert("store_cipher".encode(), export.map_err(CryptoStoreError::backend)?) - .map_err(CryptoStoreError::backend)?; - cipher - }; - - Ok(cipher) - } - - pub(crate) async fn open_helper( - db: Db, - path: Option, - store_cipher: Option>, - ) -> Result { - let account = db.open_tree("account")?; - let private_identity = db.open_tree("private_identity")?; - - let sessions = db.open_tree("session")?; - let inbound_group_sessions = db.open_tree("inbound_group_sessions")?; - - let outbound_group_sessions = db.open_tree("outbound_group_sessions")?; - - let tracked_users = db.open_tree("tracked_users")?; - let olm_hashes = db.open_tree("olm_hashes")?; - - let devices = db.open_tree("devices")?; - let identities = db.open_tree("identities")?; - - let outgoing_secret_requests = db.open_tree("outgoing_secret_requests")?; - let unsent_secret_requests = db.open_tree("unsent_secret_requests")?; - let secret_requests_by_info = db.open_tree("secret_requests_by_info")?; - - let room_settings = db.open_tree("room_settings")?; - - let session_cache = SessionStore::new(); - - let direct_withheld_info = db.open_tree("direct_withheld_info")?; - let no_olm_sent = db.open_tree("no_olm_sent")?; - - let database = Self { - account_info: RwLock::new(None).into(), - path, - inner: db, - store_cipher, - account, - private_identity, - sessions, - session_cache, - inbound_group_sessions, - outbound_group_sessions, - outgoing_secret_requests, - unsent_secret_requests, - secret_requests_by_info, - devices, - tracked_users, - olm_hashes, - identities, - direct_withheld_info, - no_olm_sent, - room_settings, - }; - - database.upgrade().await?; - - Ok(database) - } - - async fn load_tracked_users(&self) -> Result> { - let mut users = Vec::new(); - - for value in &self.tracked_users { - let (_, user) = value.map_err(CryptoStoreError::backend)?; - let user: TrackedUser = self.deserialize_value(&user)?; - - users.push(user) - } - - Ok(users) - } - - async fn load_outbound_group_session( - &self, - room_id: &RoomId, - ) -> Result> { - let account_info = self.get_account_info().ok_or(CryptoStoreError::AccountUnset)?; - - self.outbound_group_sessions - .get(self.encode_key(OUTBOUND_GROUP_TABLE_NAME, room_id)) - .map_err(CryptoStoreError::backend)? - .map(|p| self.deserialize_value(&p)) - .transpose()? - .map(|p| { - Ok(OutboundGroupSession::from_pickle( - account_info.device_id, - account_info.identity_keys, - p, - )?) - }) - .transpose() - } - - async fn save_changes(&self, changes: Changes) -> Result<()> { - let account_pickle = if let Some(account) = changes.account { - let account_info = AccountInfo { - user_id: account.user_id.clone(), - device_id: account.device_id.clone(), - identity_keys: account.identity_keys.clone(), - }; - - *self.account_info.write().unwrap() = Some(account_info); - Some(account.pickle().await) - } else { - None - }; - - let private_identity_pickle = - if let Some(i) = changes.private_identity { Some(i.pickle().await) } else { None }; - - let recovery_key_pickle = changes.recovery_key; - - let device_changes = changes.devices; - let mut session_changes = HashMap::new(); - let mut no_olm_to_clear = Vec::new(); - - for session in changes.sessions { - let user_id = session.user_id.as_str(); - let device_id = session.device_id.as_str(); - let olm_key = self.encode_key(NO_OLM_SENT_TABLE, (user_id, device_id)); - - no_olm_to_clear.push(olm_key); - - let pickle = session.pickle().await; - let key = self.encode_key(SESSIONS_TABLE_NAME, &session); - - self.session_cache.add(session).await; - session_changes.insert(key, pickle); - } - - let mut inbound_session_changes = HashMap::new(); - - for session in changes.inbound_group_sessions { - let key = self.encode_key(INBOUND_GROUP_TABLE_NAME, &session); - let pickle = session.pickle().await; - - inbound_session_changes.insert(key, pickle); - } - - let mut outbound_session_changes = HashMap::new(); - - for session in changes.outbound_group_sessions { - let key = self.encode_key(OUTBOUND_GROUP_TABLE_NAME, &session); - let pickle = session.pickle().await; - - outbound_session_changes.insert(key, pickle); - } - - let identity_changes = changes.identities; - let olm_hashes = changes.message_hashes; - let key_requests = changes.key_requests; - let backup_version = changes.backup_version; - let room_settings_changes = changes.room_settings; - - let ret: Result<(), TransactionError> = ( - &self.account, - &self.private_identity, - &self.devices, - &self.identities, - &self.sessions, - &self.inbound_group_sessions, - &self.outbound_group_sessions, - &self.olm_hashes, - &self.outgoing_secret_requests, - &self.unsent_secret_requests, - &self.secret_requests_by_info, - &self.direct_withheld_info, - &self.no_olm_sent, - &self.room_settings, - ) - .transaction( - |( - account, - private_identity, - devices, - identities, - sessions, - inbound_sessions, - outbound_sessions, - hashes, - outgoing_secret_requests, - unsent_secret_requests, - secret_requests_by_info, - direct_withheld_info, - no_olm_sent, - room_settings, - )| { - if let Some(a) = &account_pickle { - account.insert( - "account".encode(), - self.serialize_value(a).map_err(ConflictableTransactionError::Abort)?, - )?; - } - - if let Some(i) = &private_identity_pickle { - private_identity.insert( - "identity".encode(), - self.serialize_value(&i) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - - if let Some(r) = &recovery_key_pickle { - account.insert( - "recovery_key_v1".encode(), - self.serialize_value(r).map_err(ConflictableTransactionError::Abort)?, - )?; - } - - if let Some(b) = &backup_version { - account.insert( - "backup_version_v1".encode(), - self.serialize_value(b).map_err(ConflictableTransactionError::Abort)?, - )?; - } - - for device in device_changes.new.iter().chain(&device_changes.changed) { - let key = self.encode_key(DEVICE_TABLE_NAME, device); - let device = self - .serialize_value(&device) - .map_err(ConflictableTransactionError::Abort)?; - devices.insert(key, device)?; - } - - for device in &device_changes.deleted { - let key = self.encode_key(DEVICE_TABLE_NAME, device); - devices.remove(key)?; - } - - for identity in identity_changes.changed.iter().chain(&identity_changes.new) { - identities.insert( - self.encode_key(IDENTITIES_TABLE_NAME, identity.user_id()), - self.serialize_value(&identity) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - - for (key, session) in &session_changes { - sessions.insert( - key.as_slice(), - self.serialize_value(&session) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - - for (key, session) in &inbound_session_changes { - inbound_sessions.insert( - key.as_slice(), - self.serialize_value(&session) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - - for (key, session) in &outbound_session_changes { - outbound_sessions.insert( - key.as_slice(), - self.serialize_value(&session) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - - for hash in &olm_hashes { - hashes.insert( - serde_json::to_vec(hash) - .map_err(CryptoStoreError::Serialization) - .map_err(ConflictableTransactionError::Abort)?, - &[0], - )?; - } - - for key_request in &key_requests { - secret_requests_by_info.insert( - self.encode_key(SECRET_REQUEST_BY_INFO_TABLE, &key_request.info), - key_request.request_id.encode(), - )?; - - let key_request_id = key_request.request_id.encode(); - - if key_request.sent_out { - unsent_secret_requests.remove(key_request_id.clone())?; - outgoing_secret_requests.insert( - key_request_id, - self.serialize_value(&key_request) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } else { - outgoing_secret_requests.remove(key_request_id.clone())?; - unsent_secret_requests.insert( - key_request_id, - self.serialize_value(&key_request) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - } - - for (room_id, data) in &changes.withheld_session_info { - for (session_id, event) in data { - let key = - self.encode_key(DIRECT_WITHHELD_INFO_TABLE, (session_id, room_id)); - direct_withheld_info.insert( - key, - self.serialize_value(&event) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - } - - for (room_id, settings) in &room_settings_changes { - let key = self.encode_key(ROOM_SETTINGS_TABLE, room_id); - room_settings.insert( - key.as_slice(), - self.serialize_value(&settings) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - - for key in &no_olm_to_clear { - no_olm_sent.remove(key.clone())?; - } - - Ok(()) - }, - ); - - ret.map_err(CryptoStoreError::backend)?; - self.inner.flush().map_err(CryptoStoreError::backend)?; - - Ok(()) - } - - async fn get_outgoing_key_request_helper(&self, id: &[u8]) -> Result> { - let request = self - .outgoing_secret_requests - .get(id) - .map_err(CryptoStoreError::backend)? - .map(|r| self.deserialize_value(&r)) - .transpose()?; - - let request = if request.is_none() { - self.unsent_secret_requests - .get(id) - .map_err(CryptoStoreError::backend)? - .map(|r| self.deserialize_value(&r)) - .transpose()? - } else { - request - }; - - Ok(request) - } - - /// Save a batch of tracked users. - /// - /// # Arguments - /// - /// * `tracked_users` - A list of tuples. The first element of the tuple is - /// the user ID, the second element is if the user should be considered to - /// be dirty. - pub async fn save_tracked_users( - &self, - tracked_users: &[(&UserId, bool)], - ) -> Result<(), CryptoStoreError> { - let users: Vec = tracked_users - .iter() - .map(|(u, d)| TrackedUser { user_id: (*u).into(), dirty: *d }) - .collect(); - - let mut batch = Batch::default(); - - for user in users { - batch.insert( - self.encode_key(TRACKED_USERS_TABLE, user.user_id.as_str()), - self.serialize_value(&user)?, - ); - } - - self.tracked_users.apply_batch(batch).map_err(CryptoStoreError::backend) - } -} - -#[async_trait] -impl CryptoStore for SledCryptoStore { - type Error = CryptoStoreError; - - async fn load_account(&self) -> Result> { - if let Some(pickle) = - self.account.get("account".encode()).map_err(CryptoStoreError::backend)? - { - let pickle = self.deserialize_value(&pickle)?; - - let account = ReadOnlyAccount::from_pickle(pickle)?; - - let account_info = AccountInfo { - user_id: account.user_id.clone(), - device_id: account.device_id.clone(), - identity_keys: account.identity_keys.clone(), - }; - - *self.account_info.write().unwrap() = Some(account_info); - - Ok(Some(account)) - } else { - Ok(None) - } - } - - async fn save_account(&self, account: ReadOnlyAccount) -> Result<()> { - self.save_changes(Changes { account: Some(account), ..Default::default() }).await - } - - async fn load_identity(&self) -> Result> { - if let Some(i) = - self.private_identity.get("identity".encode()).map_err(CryptoStoreError::backend)? - { - let pickle = self.deserialize_value(&i)?; - Ok(Some( - PrivateCrossSigningIdentity::from_pickle(pickle) - .await - .map_err(|_| CryptoStoreError::UnpicklingError)?, - )) - } else { - Ok(None) - } - } - - async fn save_changes(&self, changes: Changes) -> Result<()> { - self.save_changes(changes).await - } - - async fn get_sessions(&self, sender_key: &str) -> Result>>>> { - let account_info = self.get_account_info().ok_or(CryptoStoreError::AccountUnset)?; - - if self.session_cache.get(sender_key).is_none() { - let sessions: Result> = self - .sessions - .scan_prefix(self.encode_key(SESSIONS_TABLE_NAME, sender_key)) - .map(|s| self.deserialize_value(&s.map_err(CryptoStoreError::backend)?.1)) - .map(|p| { - Ok(Session::from_pickle( - account_info.user_id.clone(), - account_info.device_id.clone(), - account_info.identity_keys.clone(), - p?, - )) - }) - .collect(); - - self.session_cache.set_for_sender(sender_key, sessions?); - } - - Ok(self.session_cache.get(sender_key)) - } - - async fn get_inbound_group_session( - &self, - room_id: &RoomId, - session_id: &str, - ) -> Result> { - let key = self.encode_key(INBOUND_GROUP_TABLE_NAME, (room_id, session_id)); - let pickle = self - .inbound_group_sessions - .get(key) - .map_err(CryptoStoreError::backend)? - .map(|p| self.deserialize_value(&p)); - - if let Some(pickle) = pickle { - Ok(Some(InboundGroupSession::from_pickle(pickle?)?)) - } else { - Ok(None) - } - } - - async fn get_inbound_group_sessions(&self) -> Result> { - let pickles: Result> = self - .inbound_group_sessions - .iter() - .map(|p| self.deserialize_value(&p.map_err(CryptoStoreError::backend)?.1)) - .collect(); - - Ok(pickles?.into_iter().filter_map(|p| InboundGroupSession::from_pickle(p).ok()).collect()) - } - - async fn inbound_group_session_counts(&self) -> Result { - let pickles: Vec = self - .inbound_group_sessions - .iter() - .map(|p| { - let item = p.map_err(CryptoStoreError::backend)?; - self.deserialize_value(&item.1) - }) - .collect::>()?; - - let total = pickles.len(); - let backed_up = pickles.into_iter().filter(|p| p.backed_up).count(); - - Ok(RoomKeyCounts { total, backed_up }) - } - - async fn inbound_group_sessions_for_backup( - &self, - limit: usize, - ) -> Result> { - let pickles: Vec = self - .inbound_group_sessions - .iter() - .map(|p| { - let item = p.map_err(CryptoStoreError::backend)?; - self.deserialize_value(&item.1) - }) - .filter_map(|p: Result| match p { - Ok(p) => { - if !p.backed_up { - Some(InboundGroupSession::from_pickle(p).map_err(CryptoStoreError::from)) - } else { - None - } - } - - Err(p) => Some(Err(p)), - }) - .take(limit) - .collect::>()?; - - Ok(pickles) - } - - async fn reset_backup_state(&self) -> Result<()> { - self.reset_backup_state().await - } - - async fn get_outbound_group_session( - &self, - room_id: &RoomId, - ) -> Result> { - self.load_outbound_group_session(room_id).await - } - - async fn load_tracked_users(&self) -> Result> { - self.load_tracked_users().await - } - - async fn save_tracked_users(&self, users: &[(&UserId, bool)]) -> Result<()> { - self.save_tracked_users(users).await?; - - Ok(()) - } - - async fn get_device( - &self, - user_id: &UserId, - device_id: &DeviceId, - ) -> Result> { - let key = self.encode_key(DEVICE_TABLE_NAME, (user_id, device_id)); - - Ok(self - .devices - .get(key) - .map_err(CryptoStoreError::backend)? - .map(|d| self.deserialize_value(&d)) - .transpose()?) - } - - async fn get_user_devices( - &self, - user_id: &UserId, - ) -> Result> { - let key = self.encode_key(DEVICE_TABLE_NAME, user_id); - self.devices - .scan_prefix(key) - .map(|d| self.deserialize_value(&d.map_err(CryptoStoreError::backend)?.1)) - .map(|d| { - let d: ReadOnlyDevice = d?; - Ok((d.device_id().to_owned(), d)) - }) - .collect() - } - - async fn get_user_identity(&self, user_id: &UserId) -> Result> { - let key = self.encode_key(IDENTITIES_TABLE_NAME, user_id); - - Ok(self - .identities - .get(key) - .map_err(CryptoStoreError::backend)? - .map(|i| self.deserialize_value(&i)) - .transpose()?) - } - - async fn is_message_known( - &self, - message_hash: &matrix_sdk_crypto::olm::OlmMessageHash, - ) -> Result { - Ok(self - .olm_hashes - .contains_key(serde_json::to_vec(message_hash)?) - .map_err(CryptoStoreError::backend)?) - } - - async fn get_outgoing_secret_requests( - &self, - request_id: &TransactionId, - ) -> Result> { - let request_id = request_id.encode(); - - self.get_outgoing_key_request_helper(&request_id).await - } - - async fn get_secret_request_by_info( - &self, - key_info: &SecretInfo, - ) -> Result> { - let id = self - .secret_requests_by_info - .get(self.encode_key(SECRET_REQUEST_BY_INFO_TABLE, key_info)) - .map_err(CryptoStoreError::backend)?; - - if let Some(id) = id { - self.get_outgoing_key_request_helper(&id).await - } else { - Ok(None) - } - } - - async fn get_unsent_secret_requests(&self) -> Result> { - let requests: Result> = self - .unsent_secret_requests - .iter() - .map(|i| self.deserialize_value(&i.map_err(CryptoStoreError::backend)?.1)) - .collect(); - - requests - } - - async fn delete_outgoing_secret_requests(&self, request_id: &TransactionId) -> Result<()> { - let ret: Result<(), TransactionError> = ( - &self.outgoing_secret_requests, - &self.unsent_secret_requests, - &self.secret_requests_by_info, - ) - .transaction( - |(outgoing_key_requests, unsent_key_requests, key_requests_by_info)| { - let sent_request: Option = outgoing_key_requests - .remove(request_id.encode())? - .map(|r| self.deserialize_value(&r)) - .transpose() - .map_err(ConflictableTransactionError::Abort)?; - - let unsent_request: Option = unsent_key_requests - .remove(request_id.encode())? - .map(|r| self.deserialize_value(&r)) - .transpose() - .map_err(ConflictableTransactionError::Abort)?; - - if let Some(request) = sent_request { - key_requests_by_info - .remove(self.encode_key(SECRET_REQUEST_BY_INFO_TABLE, &request.info))?; - } - - if let Some(request) = unsent_request { - key_requests_by_info - .remove(self.encode_key(SECRET_REQUEST_BY_INFO_TABLE, &request.info))?; - } - - Ok(()) - }, - ); - - ret.map_err(CryptoStoreError::backend)?; - self.inner.flush_async().await.map_err(CryptoStoreError::backend)?; - - Ok(()) - } - - async fn load_backup_keys(&self) -> Result { - let key = { - let backup_version = self - .account - .get("backup_version_v1".encode()) - .map_err(CryptoStoreError::backend)? - .map(|v| self.deserialize_value(&v)) - .transpose()?; - - let recovery_key = { - self.account - .get("recovery_key_v1".encode()) - .map_err(CryptoStoreError::backend)? - .map(|p| self.deserialize_value(&p)) - .transpose()? - }; - - BackupKeys { backup_version, recovery_key } - }; - - Ok(key) - } - - async fn get_withheld_info( - &self, - room_id: &RoomId, - session_id: &str, - ) -> Result, Self::Error> { - let key = self.encode_key(DIRECT_WITHHELD_INFO_TABLE, (session_id, room_id.as_str())); - Ok(self - .direct_withheld_info - .get(key) - .map_err(CryptoStoreError::backend)? - .map(|d| self.deserialize_value::(&d)) - .transpose()?) - } - - async fn get_room_settings(&self, room_id: &RoomId) -> Result> { - let key = self.encode_key(ROOM_SETTINGS_TABLE, room_id); - self.room_settings - .get(key) - .map_err(CryptoStoreError::backend)? - .map(|p| self.deserialize_value(&p)) - .transpose() - } - - async fn get_custom_value(&self, key: &str) -> Result>> { - let value = self.inner.get(key).map_err(CryptoStoreError::backend)?.map(|v| v.to_vec()); - Ok(value) - } - - async fn set_custom_value(&self, key: &str, value: Vec) -> Result<()> { - self.inner.insert(key, value).map_err(CryptoStoreError::backend)?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use matrix_sdk_crypto::cryptostore_integration_tests; - use once_cell::sync::Lazy; - use tempfile::{tempdir, TempDir}; - - use super::SledCryptoStore; - - static TMP_DIR: Lazy = Lazy::new(|| tempdir().unwrap()); - - async fn get_store(name: &str, passphrase: Option<&str>) -> SledCryptoStore { - let tmpdir_path = TMP_DIR.path().join(name); - - SledCryptoStore::open(tmpdir_path.to_str().unwrap(), passphrase) - .await - .expect("Can't create a passphrase protected store") - } - - cryptostore_integration_tests!(); -} - -#[cfg(test)] -mod encrypted_tests { - use matrix_sdk_crypto::cryptostore_integration_tests; - use once_cell::sync::Lazy; - use tempfile::{tempdir, TempDir}; - - use super::SledCryptoStore; - - static TMP_DIR: Lazy = Lazy::new(|| tempdir().unwrap()); - - async fn get_store(name: &str, passphrase: Option<&str>) -> SledCryptoStore { - let tmpdir_path = TMP_DIR.path().join(name); - let pass = passphrase.unwrap_or("default_test_password"); - - SledCryptoStore::open(tmpdir_path.to_str().unwrap(), Some(pass)) - .await - .expect("Can't create a passphrase protected store") - } - cryptostore_integration_tests!(); -} diff --git a/crates/matrix-sdk-sled/src/encode_key.rs b/crates/matrix-sdk-sled/src/encode_key.rs deleted file mode 100644 index 7b4a20c78..000000000 --- a/crates/matrix-sdk-sled/src/encode_key.rs +++ /dev/null @@ -1,290 +0,0 @@ -use std::{borrow::Cow, ops::Deref}; - -use matrix_sdk_store_encryption::StoreCipher; -use ruma::{ - events::{ - receipt::ReceiptType, secret::request::SecretName, GlobalAccountDataEventType, - RoomAccountDataEventType, StateEventType, - }, - DeviceId, EventId, MxcUri, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, TransactionId, - UserId, -}; - -/// Hold any data to be used as an encoding key -/// without checking for the existence of `ENCODE_SEPARATOR` within -pub struct EncodeUnchecked<'a>(&'a [u8]); - -#[cfg(feature = "state-store")] -impl<'a> EncodeUnchecked<'a> { - /// Wrap any `[u8]` - pub fn from(bytes: &'a [u8]) -> Self { - EncodeUnchecked(bytes) - } -} - -impl<'a> EncodeKey for EncodeUnchecked<'a> { - fn encode_as_bytes(&self) -> Cow<'a, [u8]> { - (self.0).into() - } -} - -pub const ENCODE_SEPARATOR: u8 = 0xff; - -pub trait EncodeKey { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - unimplemented!() - } - - fn encode(&self) -> Vec { - [self.encode_as_bytes().deref(), &[ENCODE_SEPARATOR]].concat() - } - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - let key = store_cipher.hash_key(table_name, &self.encode_as_bytes()); - [key.as_slice(), &[ENCODE_SEPARATOR]].concat() - } -} - -impl EncodeKey for &T { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - T::encode_as_bytes(self) - } - fn encode(&self) -> Vec { - T::encode(self) - } - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - T::encode_secure(self, table_name, store_cipher) - } -} - -impl EncodeKey for str { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_bytes().into() - } -} - -impl EncodeKey for String { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_str().as_bytes().into() - } -} - -impl EncodeKey for DeviceId { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_str().as_bytes().into() - } -} - -impl EncodeKey for EventId { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_str().as_bytes().into() - } -} - -impl EncodeKey for OwnedEventId { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_str().as_bytes().into() - } -} - -impl EncodeKey for RoomId { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_str().as_bytes().into() - } -} - -impl EncodeKey for OwnedRoomId { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_str().as_bytes().into() - } -} - -impl EncodeKey for TransactionId { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_str().as_bytes().into() - } -} - -impl EncodeKey for MxcUri { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - let s: &str = self.as_ref(); - s.as_bytes().into() - } -} - -impl EncodeKey for SecretName { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - let s: &str = self.as_ref(); - s.as_bytes().into() - } -} - -impl EncodeKey for ReceiptType { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - let s: &str = self.as_ref(); - s.as_bytes().into() - } -} - -impl EncodeKey for RoomAccountDataEventType { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.to_string().as_bytes().to_vec().into() - } -} - -impl EncodeKey for UserId { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_str().as_bytes().into() - } -} - -impl EncodeKey for OwnedUserId { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.as_str().as_bytes().into() - } -} - -impl EncodeKey for StateEventType { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.to_string().as_bytes().to_vec().into() - } -} - -impl EncodeKey for GlobalAccountDataEventType { - fn encode_as_bytes(&self) -> Cow<'_, [u8]> { - self.to_string().as_bytes().to_vec().into() - } -} - -impl EncodeKey for (A, B) -where - A: EncodeKey, - B: EncodeKey, -{ - fn encode(&self) -> Vec { - [ - self.0.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.1.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - ] - .concat() - } - - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - [ - store_cipher.hash_key(table_name, &self.0.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.1.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - ] - .concat() - } -} - -impl EncodeKey for (A, B, C) -where - A: EncodeKey, - B: EncodeKey, - C: EncodeKey, -{ - fn encode(&self) -> Vec { - [ - self.0.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.1.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.2.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - ] - .concat() - } - - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - [ - store_cipher.hash_key(table_name, &self.0.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.1.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.2.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - ] - .concat() - } -} - -impl EncodeKey for (A, B, C, D) -where - A: EncodeKey, - B: EncodeKey, - C: EncodeKey, - D: EncodeKey, -{ - fn encode(&self) -> Vec { - [ - self.0.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.1.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.2.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.3.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - ] - .concat() - } - - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - [ - store_cipher.hash_key(table_name, &self.0.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.1.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.2.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.3.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - ] - .concat() - } -} - -impl EncodeKey for (A, B, C, D, E) -where - A: EncodeKey, - B: EncodeKey, - C: EncodeKey, - D: EncodeKey, - E: EncodeKey, -{ - fn encode(&self) -> Vec { - [ - self.0.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.1.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.2.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.3.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - self.4.encode_as_bytes().deref(), - &[ENCODE_SEPARATOR], - ] - .concat() - } - - fn encode_secure(&self, table_name: &str, store_cipher: &StoreCipher) -> Vec { - [ - store_cipher.hash_key(table_name, &self.0.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.1.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.2.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.3.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - store_cipher.hash_key(table_name, &self.4.encode_as_bytes()).as_slice(), - &[ENCODE_SEPARATOR], - ] - .concat() - } -} diff --git a/crates/matrix-sdk-sled/src/lib.rs b/crates/matrix-sdk-sled/src/lib.rs deleted file mode 100644 index 8dc8e293d..000000000 --- a/crates/matrix-sdk-sled/src/lib.rs +++ /dev/null @@ -1,92 +0,0 @@ -#[cfg(any(feature = "state-store", feature = "crypto-store"))] -use matrix_sdk_base::store::StoreConfig; -#[cfg(feature = "state-store")] -use matrix_sdk_base::store::StoreError; -#[cfg(feature = "crypto-store")] -use matrix_sdk_crypto::store::CryptoStoreError; -use sled::Error as SledError; -use thiserror::Error; - -#[cfg(feature = "crypto-store")] -mod crypto_store; -mod encode_key; -#[cfg(feature = "state-store")] -mod state_store; - -#[cfg(feature = "crypto-store")] -pub use crypto_store::SledCryptoStore; -#[cfg(feature = "state-store")] -pub use state_store::{MigrationConflictStrategy, SledStateStore, SledStateStoreBuilder}; - -/// All the errors that can occur when opening a sled store. -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum OpenStoreError { - /// An error occurred with the state store implementation. - #[cfg(feature = "state-store")] - #[error(transparent)] - State(#[from] StoreError), - - /// An error occurred with the crypto store implementation. - #[cfg(feature = "crypto-store")] - #[error(transparent)] - Crypto(#[from] CryptoStoreError), - - /// An error occurred with sled. - #[error(transparent)] - Sled(#[from] SledError), -} - -/// Create a [`StoreConfig`] with an opened [`SledStateStore`] that uses the -/// given path and passphrase. -/// -/// If the `e2e-encryption` Cargo feature is enabled, a [`SledCryptoStore`] with -/// the same parameters is also opened. -/// -/// [`StoreConfig`]: #StoreConfig -#[cfg(any(feature = "state-store", feature = "crypto-store"))] -pub async fn make_store_config( - path: impl AsRef, - passphrase: Option<&str>, -) -> Result { - #[cfg(all(feature = "crypto-store", feature = "state-store"))] - { - let (state_store, crypto_store) = open_stores_with_path(path, passphrase).await?; - Ok(StoreConfig::new().state_store(state_store).crypto_store(crypto_store)) - } - - #[cfg(all(feature = "crypto-store", not(feature = "state-store")))] - { - let crypto_store = SledCryptoStore::open(path, passphrase).await?; - Ok(StoreConfig::new().crypto_store(crypto_store)) - } - - #[cfg(not(feature = "crypto-store"))] - { - let mut store_builder = SledStateStore::builder().path(path.as_ref().to_path_buf()); - - if let Some(passphrase) = passphrase { - store_builder = store_builder.passphrase(passphrase.to_owned()); - } - let state_store = store_builder.build().map_err(StoreError::backend)?; - - Ok(StoreConfig::new().state_store(state_store)) - } -} - -/// Create a [`StateStore`] and a [`CryptoStore`] that use the same database and -/// passphrase. -#[cfg(all(feature = "state-store", feature = "crypto-store"))] -async fn open_stores_with_path( - path: impl AsRef, - passphrase: Option<&str>, -) -> Result<(SledStateStore, SledCryptoStore), OpenStoreError> { - let mut store_builder = SledStateStore::builder().path(path.as_ref().to_path_buf()); - if let Some(passphrase) = passphrase { - store_builder = store_builder.passphrase(passphrase.to_owned()); - } - - let state_store = store_builder.build().map_err(StoreError::backend)?; - let crypto_store = state_store.open_crypto_store().await?; - Ok((state_store, crypto_store)) -} diff --git a/crates/matrix-sdk-sled/src/state_store/migrations.rs b/crates/matrix-sdk-sled/src/state_store/migrations.rs deleted file mode 100644 index 938f7e91e..000000000 --- a/crates/matrix-sdk-sled/src/state_store/migrations.rs +++ /dev/null @@ -1,825 +0,0 @@ -// Copyright 2021 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use matrix_sdk_base::{ - store::{Result as StoreResult, StoreError}, - RoomInfo, StateStoreDataKey, -}; -use ruma::{ - events::{ - room::member::{StrippedRoomMemberEvent, SyncRoomMemberEvent}, - StateEventType, - }, - serde::Raw, -}; -use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue}; -use sled::{transaction::TransactionError, Batch, Transactional, Tree}; -use tracing::debug; - -use super::{keys, Result, RoomMember, SledStateStore, SledStoreError}; -use crate::encode_key::EncodeKey; - -const DATABASE_VERSION: u8 = 7; - -const VERSION_KEY: &str = "state-store-version"; - -/// Sometimes Migrations can't proceed without having to drop existing -/// data. This allows you to configure, how these cases should be handled. -#[derive(PartialEq, Eq, Clone, Debug)] -pub enum MigrationConflictStrategy { - /// Just drop the data, we don't care that we have to sync again - Drop, - /// Raise a `SledStoreError::MigrationConflict` error with the path to the - /// DB in question. The caller then has to take care about what they want - /// to do and try again after. - Raise, - /// _Default_: The _entire_ database is backed up under - /// `$path.$timestamp.backup` (this includes the crypto store if they - /// are linked), before the state tables are dropped. - BackupAndDrop, -} - -impl SledStateStore { - pub(super) fn upgrade(&mut self) -> Result<()> { - let old_version = self.db_version()?; - - if old_version == 0 { - // we are fresh, let's write the current version - return self.set_db_version(DATABASE_VERSION); - } - if old_version == DATABASE_VERSION { - // current, we don't have to do anything - return Ok(()); - }; - - debug!(old_version, new_version = DATABASE_VERSION, "Upgrading the Sled state store"); - - if old_version == 1 && self.store_cipher.is_some() { - // we stored some fields un-encrypted. Drop them to force re-creation - return Err(SledStoreError::MigrationConflict { - path: self.path.take().expect("Path must exist for a migration to fail"), - old_version: old_version.into(), - new_version: DATABASE_VERSION.into(), - }); - } - - if old_version < 3 { - self.migrate_to_v3()?; - } - - if old_version < 4 { - self.migrate_to_v4()?; - } - - if old_version < 5 { - self.migrate_to_v5()?; - return Ok(()); - } - - // Version 6 was dropped and migration is similar to v7. - - if old_version < 7 { - self.migrate_to_v7()?; - return Ok(()); - } - - // FUTURE UPGRADE CODE GOES HERE - - // can't upgrade from that version to the new one - Err(SledStoreError::MigrationConflict { - path: self.path.take().expect("Path must exist for a migration to fail"), - old_version: old_version.into(), - new_version: DATABASE_VERSION.into(), - }) - } - - /// Get the version of the database. - /// - /// Returns `0` for a new database. - fn db_version(&self) -> Result { - Ok(self - .inner - .get(VERSION_KEY)? - .map(|v| { - let (version_bytes, _) = v.split_at(std::mem::size_of::()); - u8::from_be_bytes(version_bytes.try_into().unwrap_or_default()) - }) - .unwrap_or_default()) - } - - fn set_db_version(&self, version: u8) -> Result<()> { - self.inner.insert(VERSION_KEY, version.to_be_bytes().as_ref())?; - self.inner.flush()?; - Ok(()) - } - - pub fn drop_v1_tables(self) -> StoreResult<()> { - for name in V1_DB_STORES { - self.inner.drop_tree(name).map_err(StoreError::backend)?; - } - self.inner.remove(VERSION_KEY).map_err(StoreError::backend)?; - - Ok(()) - } - - fn v3_fix_tree(&self, tree: &Tree, batch: &mut Batch) -> Result<()> { - fn maybe_fix_json(raw_json: &RawJsonValue) -> Result> { - let json = raw_json.get(); - - if json.contains(r#""content":null"#) { - let mut value: JsonValue = serde_json::from_str(json)?; - if let Some(content) = value.get_mut("content") { - if matches!(content, JsonValue::Null) { - *content = JsonValue::Object(Default::default()); - return Ok(Some(value)); - } - } - } - - Ok(None) - } - - for entry in tree.iter() { - let (key, value) = entry?; - let raw_json: Box = self.deserialize_value(&value)?; - - if let Some(fixed_json) = maybe_fix_json(&raw_json)? { - batch.insert(key, self.serialize_value(&fixed_json)?); - } - } - - Ok(()) - } - - fn migrate_to_v3(&self) -> Result<()> { - let mut room_info_batch = sled::Batch::default(); - self.v3_fix_tree(&self.room_info, &mut room_info_batch)?; - - let mut room_state_batch = sled::Batch::default(); - self.v3_fix_tree(&self.room_state, &mut room_state_batch)?; - - let ret: Result<(), TransactionError> = (&self.room_info, &self.room_state) - .transaction(|(room_info, room_state)| { - room_info.apply_batch(&room_info_batch)?; - room_state.apply_batch(&room_state_batch)?; - - Ok(()) - }); - ret?; - - self.set_db_version(3u8) - } - - /// Replace the SYNC_TOKEN and SESSION trees by KV. - fn migrate_to_v4(&self) -> Result<()> { - { - let session = &self.inner.open_tree(old_keys::SESSION)?; - let mut batch = sled::Batch::default(); - - // Sync token - let sync_token = session.get(StateStoreDataKey::SYNC_TOKEN.encode())?; - if let Some(sync_token) = sync_token { - batch.insert(StateStoreDataKey::SYNC_TOKEN.encode(), sync_token); - } - - // Filters - let key = self.encode_key(keys::SESSION, StateStoreDataKey::FILTER); - for res in session.scan_prefix(key) { - let (key, value) = res?; - batch.insert(key, value); - } - self.kv.apply_batch(batch)?; - } - - // This was unused so we can just drop it. - self.inner.drop_tree(old_keys::SYNC_TOKEN)?; - self.inner.drop_tree(old_keys::SESSION)?; - - self.set_db_version(4) - } - - /// Move the member events with the other state events. - fn migrate_to_v5(&self) -> Result<()> { - { - let members = &self.inner.open_tree(old_keys::MEMBER)?; - let mut state_batch = sled::Batch::default(); - - for room_info in - self.room_info.iter().map(|r| self.deserialize_value::(&r?.1)) - { - let room_info = room_info?; - let room_id = room_info.room_id(); - let prefix = self.encode_key(old_keys::MEMBER, room_id); - - for entry in members.scan_prefix(prefix) { - let (_, value) = entry?; - let raw_member_event = - self.deserialize_value::>(&value)?; - let state_key = - raw_member_event.get_field::("state_key")?.unwrap_or_default(); - let key = self.encode_key( - keys::ROOM_STATE, - (room_id, StateEventType::RoomMember, state_key), - ); - state_batch.insert(key, value); - } - } - - let stripped_members = &self.inner.open_tree(old_keys::STRIPPED_ROOM_MEMBER)?; - let mut stripped_state_batch = sled::Batch::default(); - - for room_info in - self.stripped_room_infos.iter().map(|r| self.deserialize_value::(&r?.1)) - { - let room_info = room_info?; - let room_id = room_info.room_id(); - let prefix = self.encode_key(old_keys::STRIPPED_ROOM_MEMBER, room_id); - - for entry in stripped_members.scan_prefix(prefix) { - let (_, value) = entry?; - let raw_member_event = - self.deserialize_value::>(&value)?; - let state_key = - raw_member_event.get_field::("state_key")?.unwrap_or_default(); - let key = self.encode_key( - keys::STRIPPED_ROOM_STATE, - (room_id, StateEventType::RoomMember, state_key), - ); - stripped_state_batch.insert(key, value); - } - } - - let ret: Result<(), TransactionError> = - (&self.room_state, &self.stripped_room_state).transaction( - |(room_state, stripped_room_state)| { - room_state.apply_batch(&state_batch)?; - stripped_room_state.apply_batch(&stripped_state_batch)?; - - Ok(()) - }, - ); - ret?; - } - - self.inner.drop_tree(old_keys::MEMBER)?; - self.inner.drop_tree(old_keys::STRIPPED_ROOM_MEMBER)?; - - self.set_db_version(5) - } - - /// Remove the old user IDs stores and populate the new ones. - fn migrate_to_v7(&self) -> Result<()> { - { - // Reset v6 stores. - self.user_ids.clear()?; - self.stripped_user_ids.clear()?; - - // We only have joined and invited user IDs in the old stores, so instead we - // use the room member events to populate the new stores. - let state = &self.inner.open_tree(keys::ROOM_STATE)?; - let mut user_ids_batch = sled::Batch::default(); - - for room_info in - self.room_info.iter().map(|r| self.deserialize_value::(&r?.1)) - { - let room_info = room_info?; - let room_id = room_info.room_id(); - let prefix = - self.encode_key(keys::ROOM_STATE, (room_id, StateEventType::RoomMember)); - - for entry in state.scan_prefix(prefix) { - let (_, value) = entry?; - let member_event = self - .deserialize_value::>(&value)? - .deserialize()?; - let key = self.encode_key(keys::USER_ID, (room_id, member_event.state_key())); - let value = self.serialize_value(&RoomMember::from(&member_event))?; - user_ids_batch.insert(key, value); - } - } - - let stripped_state = &self.inner.open_tree(keys::STRIPPED_ROOM_STATE)?; - let mut stripped_user_ids_batch = sled::Batch::default(); - - for room_info in - self.stripped_room_infos.iter().map(|r| self.deserialize_value::(&r?.1)) - { - let room_info = room_info?; - let room_id = room_info.room_id(); - let prefix = self - .encode_key(keys::STRIPPED_ROOM_STATE, (room_id, StateEventType::RoomMember)); - - for entry in stripped_state.scan_prefix(prefix) { - let (_, value) = entry?; - let stripped_member_event = self - .deserialize_value::>(&value)? - .deserialize()?; - let key = self.encode_key( - keys::STRIPPED_USER_ID, - (room_id, &stripped_member_event.state_key), - ); - let value = self.serialize_value(&RoomMember::from(&stripped_member_event))?; - stripped_user_ids_batch.insert(key, value); - } - } - - let ret: Result<(), TransactionError> = - (&self.user_ids, &self.stripped_user_ids).transaction( - |(user_ids, stripped_user_ids)| { - user_ids.apply_batch(&user_ids_batch)?; - stripped_user_ids.apply_batch(&stripped_user_ids_batch)?; - - Ok(()) - }, - ); - ret?; - } - - self.inner.drop_tree(old_keys::JOINED_USER_ID)?; - self.inner.drop_tree(old_keys::INVITED_USER_ID)?; - self.inner.drop_tree(old_keys::STRIPPED_JOINED_USER_ID)?; - self.inner.drop_tree(old_keys::STRIPPED_INVITED_USER_ID)?; - - self.set_db_version(7) - } -} - -mod old_keys { - /// Old stores. - pub const SYNC_TOKEN: &str = "sync_token"; - pub const SESSION: &str = "session"; - pub const MEMBER: &str = "member"; - pub const STRIPPED_ROOM_MEMBER: &str = "stripped-room-member"; - pub const INVITED_USER_ID: &str = "invited-user-id"; - pub const JOINED_USER_ID: &str = "joined-user-id"; - pub const STRIPPED_INVITED_USER_ID: &str = "stripped-invited-user-id"; - pub const STRIPPED_JOINED_USER_ID: &str = "stripped-joined-user-id"; -} - -pub const V1_DB_STORES: &[&str] = &[ - keys::ACCOUNT_DATA, - old_keys::SYNC_TOKEN, - keys::DISPLAY_NAME, - old_keys::INVITED_USER_ID, - old_keys::JOINED_USER_ID, - keys::MEDIA, - old_keys::MEMBER, - keys::PRESENCE, - keys::PROFILE, - keys::ROOM_ACCOUNT_DATA, - keys::ROOM_EVENT_RECEIPT, - keys::ROOM_INFO, - keys::ROOM_STATE, - keys::ROOM_USER_RECEIPT, - keys::ROOM, - old_keys::SESSION, - old_keys::STRIPPED_INVITED_USER_ID, - old_keys::STRIPPED_JOINED_USER_ID, - keys::STRIPPED_ROOM_INFO, - old_keys::STRIPPED_ROOM_MEMBER, - keys::STRIPPED_ROOM_STATE, - keys::CUSTOM, -]; - -#[cfg(test)] -mod test { - use assert_matches::assert_matches; - use matrix_sdk_base::{ - deserialized_responses::RawMemberEvent, RoomInfo, RoomMemberships, RoomState, - StateStoreDataKey, - }; - use matrix_sdk_test::{async_test, test_json}; - use ruma::{ - events::{ - room::{ - member::{StrippedRoomMemberEvent, SyncRoomMemberEvent}, - topic::RoomTopicEventContent, - }, - AnySyncStateEvent, StateEventType, - }, - room_id, - serde::Raw, - user_id, - }; - use serde_json::json; - use tempfile::TempDir; - - use super::{old_keys, MigrationConflictStrategy}; - use crate::{ - encode_key::EncodeKey, - state_store::{keys, Result, SledStateStore, SledStoreError}, - }; - - #[async_test] - pub async fn migrating_v1_to_2_plain() -> Result<()> { - let folder = TempDir::new()?; - - let store = SledStateStore::builder().path(folder.path().to_path_buf()).build()?; - - store.set_db_version(1u8)?; - drop(store); - - // this transparently migrates to the latest version - let _store = SledStateStore::builder().path(folder.path().to_path_buf()).build()?; - Ok(()) - } - - #[async_test] - pub async fn migrating_v1_to_2_with_pw_backed_up() -> Result<()> { - let folder = TempDir::new()?; - - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("something".to_owned()) - .build()?; - - store.set_db_version(1u8)?; - drop(store); - - // this transparently creates a backup and a fresh db - let _store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("something".to_owned()) - .build()?; - assert_eq!(std::fs::read_dir(folder.path())?.count(), 2); - Ok(()) - } - - #[async_test] - pub async fn migrating_v1_to_2_with_pw_drop() -> Result<()> { - let folder = TempDir::new()?; - - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("other thing".to_owned()) - .build()?; - - store.set_db_version(1u8)?; - drop(store); - - // this transparently creates a backup and a fresh db - let _store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("other thing".to_owned()) - .migration_conflict_strategy(MigrationConflictStrategy::Drop) - .build()?; - assert_eq!(std::fs::read_dir(folder.path())?.count(), 1); - Ok(()) - } - - #[async_test] - pub async fn migrating_v1_to_2_with_pw_raises() -> Result<()> { - let folder = TempDir::new()?; - - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .build()?; - - store.set_db_version(1u8)?; - drop(store); - - // this transparently creates a backup and a fresh db - let res = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .migration_conflict_strategy(MigrationConflictStrategy::Raise) - .build(); - if let Err(SledStoreError::MigrationConflict { .. }) = res { - // all good - } else { - panic!("Didn't raise the expected error: {res:?}"); - } - assert_eq!(std::fs::read_dir(folder.path())?.count(), 1); - Ok(()) - } - - #[async_test] - pub async fn migrating_v2_to_v3() { - // An event that fails to deserialize. - let wrong_redacted_state_event = json!({ - "content": null, - "event_id": "$wrongevent", - "origin_server_ts": 1673887516047_u64, - "sender": "@example:localhost", - "state_key": "", - "type": "m.room.topic", - "unsigned": { - "redacted_because": { - "type": "m.room.redaction", - "sender": "@example:localhost", - "content": {}, - "redacts": "$wrongevent", - "origin_server_ts": 1673893816047_u64, - "unsigned": {}, - "event_id": "$redactionevent", - }, - }, - }); - serde_json::from_value::(wrong_redacted_state_event.clone()) - .unwrap_err(); - - let room_id = room_id!("!some_room:localhost"); - let folder = TempDir::new().unwrap(); - - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .build() - .unwrap(); - - store - .room_state - .insert( - store.encode_key(keys::ROOM_STATE, (room_id, StateEventType::RoomTopic, "")), - store.serialize_value(&wrong_redacted_state_event).unwrap(), - ) - .unwrap(); - store.set_db_version(2u8).unwrap(); - drop(store); - - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .build() - .unwrap(); - let event = - store.get_state_event(room_id, StateEventType::RoomTopic, "").await.unwrap().unwrap(); - event.cast::().deserialize().unwrap(); - } - - #[async_test] - pub async fn migrating_v3_to_v4() { - let sync_token = "a_very_unique_string"; - let filter_1 = "filter_1"; - let filter_1_id = "filter_1_id"; - let filter_2 = "filter_2"; - let filter_2_id = "filter_2_id"; - - let folder = TempDir::new().unwrap(); - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .build() - .unwrap(); - - let session = store.inner.open_tree(old_keys::SESSION).unwrap(); - let mut batch = sled::Batch::default(); - batch.insert( - StateStoreDataKey::SYNC_TOKEN.encode(), - store.serialize_value(&sync_token).unwrap(), - ); - batch.insert( - store.encode_key(keys::SESSION, (StateStoreDataKey::FILTER, filter_1)), - store.serialize_value(&filter_1_id).unwrap(), - ); - batch.insert( - store.encode_key(keys::SESSION, (StateStoreDataKey::FILTER, filter_2)), - store.serialize_value(&filter_2_id).unwrap(), - ); - session.apply_batch(batch).unwrap(); - - store.set_db_version(3).unwrap(); - drop(session); - drop(store); - - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .build() - .unwrap(); - - let stored_sync_token = store - .get_kv_data(StateStoreDataKey::SyncToken) - .await - .unwrap() - .unwrap() - .into_sync_token() - .unwrap(); - assert_eq!(stored_sync_token, sync_token); - - let stored_filter_1_id = store - .get_kv_data(StateStoreDataKey::Filter(filter_1)) - .await - .unwrap() - .unwrap() - .into_filter() - .unwrap(); - assert_eq!(stored_filter_1_id, filter_1_id); - - let stored_filter_2_id = store - .get_kv_data(StateStoreDataKey::Filter(filter_2)) - .await - .unwrap() - .unwrap() - .into_filter() - .unwrap(); - assert_eq!(stored_filter_2_id, filter_2_id); - } - - #[async_test] - pub async fn migrating_v4_to_v5() { - let room_id = room_id!("!room:localhost"); - let member_event = - Raw::new(&*test_json::MEMBER_INVITE).unwrap().cast::(); - let user_id = user_id!("@invited:localhost"); - - let stripped_room_id = room_id!("!stripped_room:localhost"); - let stripped_member_event = - Raw::new(&*test_json::MEMBER_STRIPPED).unwrap().cast::(); - let stripped_user_id = user_id!("@example:localhost"); - - let folder = TempDir::new().unwrap(); - { - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .build() - .unwrap(); - - let members = store.inner.open_tree(old_keys::MEMBER).unwrap(); - members - .insert( - store.encode_key(old_keys::MEMBER, (room_id, user_id)), - store.serialize_value(&member_event).unwrap(), - ) - .unwrap(); - let room_infos = store.inner.open_tree(keys::ROOM_INFO).unwrap(); - let room_info = RoomInfo::new(room_id, RoomState::Joined); - room_infos - .insert( - store.encode_key(keys::ROOM_INFO, room_id), - store.serialize_value(&room_info).unwrap(), - ) - .unwrap(); - - let stripped_members = store.inner.open_tree(old_keys::STRIPPED_ROOM_MEMBER).unwrap(); - stripped_members - .insert( - store.encode_key( - old_keys::STRIPPED_ROOM_MEMBER, - (stripped_room_id, stripped_user_id), - ), - store.serialize_value(&stripped_member_event).unwrap(), - ) - .unwrap(); - let stripped_room_infos = store.inner.open_tree(keys::STRIPPED_ROOM_INFO).unwrap(); - let stripped_room_info = RoomInfo::new(stripped_room_id, RoomState::Invited); - stripped_room_infos - .insert( - store.encode_key(keys::STRIPPED_ROOM_INFO, stripped_room_id), - store.serialize_value(&stripped_room_info).unwrap(), - ) - .unwrap(); - - store.set_db_version(4).unwrap(); - } - - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .build() - .unwrap(); - - let stored_member_event = assert_matches!( - store.get_member_event(room_id, user_id).await, - Ok(Some(RawMemberEvent::Sync(e))) => e - ); - assert_eq!(stored_member_event.json().get(), member_event.json().get()); - - let stored_stripped_member_event = assert_matches!( - store.get_member_event(stripped_room_id, stripped_user_id).await, - Ok(Some(RawMemberEvent::Stripped(e))) => e - ); - assert_eq!(stored_stripped_member_event.json().get(), stripped_member_event.json().get()); - } - - #[async_test] - pub async fn migrating_v5_to_v7() { - let room_id = room_id!("!room:localhost"); - let invite_member_event = - Raw::new(&*test_json::MEMBER_INVITE).unwrap().cast::(); - let invite_user_id = user_id!("@invited:localhost"); - let ban_member_event = - Raw::new(&*test_json::MEMBER_BAN).unwrap().cast::(); - let ban_user_id = user_id!("@banned:localhost"); - - let stripped_room_id = room_id!("!stripped_room:localhost"); - let stripped_member_event = - Raw::new(&*test_json::MEMBER_STRIPPED).unwrap().cast::(); - let stripped_user_id = user_id!("@example:localhost"); - - let folder = TempDir::new().unwrap(); - { - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .build() - .unwrap(); - - let state = store.inner.open_tree(keys::ROOM_STATE).unwrap(); - state - .insert( - store.encode_key( - keys::ROOM_STATE, - (room_id, StateEventType::RoomMember, invite_user_id), - ), - store.serialize_value(&invite_member_event).unwrap(), - ) - .unwrap(); - state - .insert( - store.encode_key( - keys::ROOM_STATE, - (room_id, StateEventType::RoomMember, ban_user_id), - ), - store.serialize_value(&ban_member_event).unwrap(), - ) - .unwrap(); - let room_infos = store.inner.open_tree(keys::ROOM_INFO).unwrap(); - let room_info = RoomInfo::new(room_id, RoomState::Joined); - room_infos - .insert( - store.encode_key(keys::ROOM_INFO, room_id), - store.serialize_value(&room_info).unwrap(), - ) - .unwrap(); - - let stripped_state = store.inner.open_tree(keys::STRIPPED_ROOM_STATE).unwrap(); - stripped_state - .insert( - store.encode_key( - keys::STRIPPED_ROOM_STATE, - (stripped_room_id, StateEventType::RoomMember, stripped_user_id), - ), - store.serialize_value(&stripped_member_event).unwrap(), - ) - .unwrap(); - let stripped_room_infos = store.inner.open_tree(keys::STRIPPED_ROOM_INFO).unwrap(); - let stripped_room_info = RoomInfo::new(stripped_room_id, RoomState::Invited); - stripped_room_infos - .insert( - store.encode_key(keys::STRIPPED_ROOM_INFO, stripped_room_id), - store.serialize_value(&stripped_room_info).unwrap(), - ) - .unwrap(); - - store.set_db_version(5).unwrap(); - } - - let store = SledStateStore::builder() - .path(folder.path().to_path_buf()) - .passphrase("secret".to_owned()) - .build() - .unwrap(); - - assert_eq!( - store.get_user_ids(room_id, RoomMemberships::JOIN, false).await.unwrap().len(), - 0 - ); - assert_eq!( - store.get_user_ids(room_id, RoomMemberships::INVITE, false).await.unwrap().as_slice(), - [invite_user_id.to_owned()] - ); - let user_ids = store.get_user_ids(room_id, RoomMemberships::empty(), false).await.unwrap(); - assert_eq!(user_ids.len(), 2); - assert!(user_ids.contains(&invite_user_id.to_owned())); - assert!(user_ids.contains(&ban_user_id.to_owned())); - - assert_eq!( - store - .get_user_ids(stripped_room_id, RoomMemberships::JOIN, true) - .await - .unwrap() - .as_slice(), - [stripped_user_id.to_owned()] - ); - assert_eq!( - store - .get_user_ids(stripped_room_id, RoomMemberships::INVITE, true) - .await - .unwrap() - .len(), - 0 - ); - assert_eq!( - store - .get_user_ids(stripped_room_id, RoomMemberships::empty(), true) - .await - .unwrap() - .as_slice(), - [stripped_user_id.to_owned()] - ); - } -} diff --git a/crates/matrix-sdk-sled/src/state_store/mod.rs b/crates/matrix-sdk-sled/src/state_store/mod.rs deleted file mode 100644 index 4509246f3..000000000 --- a/crates/matrix-sdk-sled/src/state_store/mod.rs +++ /dev/null @@ -1,1434 +0,0 @@ -// Copyright 2021 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{ - collections::BTreeSet, - path::PathBuf, - sync::Arc, - time::{Instant, SystemTime, UNIX_EPOCH}, -}; - -use async_trait::async_trait; -use futures_core::stream::Stream; -use futures_util::stream::{self, TryStreamExt}; -use matrix_sdk_base::{ - deserialized_responses::{RawAnySyncOrStrippedState, RawMemberEvent}, - media::{MediaRequest, UniqueKey}, - store::{Result as StoreResult, StateChanges, StateStore, StoreError}, - MinimalStateEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue, -}; -use matrix_sdk_store_encryption::{Error as KeyEncryptionError, StoreCipher}; -use ruma::{ - canonical_json::redact, - events::{ - presence::PresenceEvent, - receipt::{Receipt, ReceiptThread, ReceiptType}, - room::member::{ - MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent, - }, - AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, - GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent, - }, - serde::Raw, - CanonicalJsonObject, EventId, IdParseError, MxcUri, OwnedEventId, OwnedUserId, RoomId, - RoomVersionId, UserId, -}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use sled::{ - transaction::{ConflictableTransactionError, TransactionError}, - Config, Db, Transactional, Tree, -}; -use tokio::task::spawn_blocking; -use tracing::{debug, info, warn}; - -mod migrations; - -pub use self::migrations::MigrationConflictStrategy; -#[cfg(feature = "crypto-store")] -use super::OpenStoreError; -use crate::encode_key::{EncodeKey, EncodeUnchecked}; -#[cfg(feature = "crypto-store")] -pub use crate::SledCryptoStore; - -#[derive(Debug, thiserror::Error)] -pub enum SledStoreError { - #[error(transparent)] - Json(#[from] serde_json::Error), - #[error(transparent)] - Encryption(#[from] KeyEncryptionError), - #[error(transparent)] - StoreError(#[from] StoreError), - #[error(transparent)] - TransactionError(#[from] sled::Error), - #[error(transparent)] - Identifier(#[from] IdParseError), - #[error(transparent)] - Task(#[from] tokio::task::JoinError), - #[error(transparent)] - Io(#[from] std::io::Error), - #[error(transparent)] - FsExtra(#[from] fs_extra::error::Error), - #[error("Can't migrate {path} from {old_version} to {new_version} without deleting data. See MigrationConflictStrategy for ways to configure.")] - MigrationConflict { path: PathBuf, old_version: usize, new_version: usize }, -} - -impl From> for SledStoreError { - fn from(e: TransactionError) -> Self { - match e { - TransactionError::Abort(e) => e, - TransactionError::Storage(e) => SledStoreError::TransactionError(e), - } - } -} - -impl From for StoreError { - fn from(err: SledStoreError) -> StoreError { - match err { - SledStoreError::Json(e) => StoreError::Json(e), - SledStoreError::Identifier(e) => StoreError::Identifier(e), - SledStoreError::Encryption(e) => StoreError::Encryption(e), - SledStoreError::StoreError(e) => e, - _ => StoreError::backend(err), - } - } -} - -mod keys { - // Static keys - pub const SESSION: &str = "session"; - - // Stores - pub const ACCOUNT_DATA: &str = "account-data"; - pub const CUSTOM: &str = "custom"; - pub const DISPLAY_NAME: &str = "display-name"; - pub const USER_ID: &str = "user-ids"; - pub const MEDIA: &str = "media"; - pub const PRESENCE: &str = "presence"; - pub const PROFILE: &str = "profile"; - pub const ROOM_ACCOUNT_DATA: &str = "room-account-data"; - pub const ROOM_EVENT_RECEIPT: &str = "room-event-receipt"; - pub const ROOM_INFO: &str = "room-info"; - pub const ROOM_STATE: &str = "room-state"; - pub const ROOM_USER_RECEIPT: &str = "room-user-receipt"; - pub const ROOM: &str = "room"; - pub const STRIPPED_USER_ID: &str = "stripped-user-ids"; - pub const STRIPPED_ROOM_INFO: &str = "stripped-room-info"; - pub const STRIPPED_ROOM_STATE: &str = "stripped-room-state"; - pub const KV: &str = "kv"; -} - -type Result = std::result::Result; - -#[derive(Debug, Clone)] -enum DbOrPath { - Db(Db), - Path(PathBuf), -} - -/// Builder for [`SledStateStore`]. -#[derive(Debug)] -pub struct SledStateStoreBuilder { - db_or_path: Option, - passphrase: Option, - migration_conflict_strategy: MigrationConflictStrategy, -} - -impl SledStateStoreBuilder { - fn new() -> Self { - Self { - db_or_path: None, - passphrase: None, - migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop, - } - } - - /// Path to the sled store files, created if not it doesn't exist yet. - /// - /// Mutually exclusive with [`db`][Self::db], whichever is called last wins. - pub fn path(mut self, path: PathBuf) -> Self { - self.db_or_path = Some(DbOrPath::Path(path)); - self - } - - /// Use the given [`sled::Db`]. - /// - /// Mutually exclusive with [`path`][Self::path], whichever is called last - /// wins. - pub fn db(mut self, db: Db) -> Self { - self.db_or_path = Some(DbOrPath::Db(db)); - self - } - - /// Set the password the sled store is encrypted with (if any). - pub fn passphrase(mut self, value: String) -> Self { - self.passphrase = Some(value); - self - } - - /// Set the strategy to use when a merge conflict is found. - /// - /// See [`MigrationConflictStrategy`] for details. - pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self { - self.migration_conflict_strategy = value; - self - } - - /// Create a [`SledStateStore`] with the options set on this builder. - /// - /// # Errors - /// - /// This method can fail for two general reasons: - /// - /// * Invalid path: The [`sled::Db`] could not be opened at the supplied - /// path. - /// * Migration error: The migration to a newer version of the schema - /// failed, see `SledStoreError::MigrationConflict`. - pub fn build(self) -> Result { - let (db, path) = match &self.db_or_path { - None => { - let db = Config::new().temporary(true).open().map_err(StoreError::backend)?; - (db, None) - } - Some(DbOrPath::Db(db)) => (db.clone(), None), - Some(DbOrPath::Path(path)) => { - let path = path.join("matrix-sdk-state"); - let db = Config::new().path(&path).open().map_err(StoreError::backend)?; - (db, Some(path)) - } - }; - - let store_cipher = if let Some(passphrase) = &self.passphrase { - if let Some(inner) = db.get("store_cipher".encode())? { - Some(StoreCipher::import(passphrase, &inner)?.into()) - } else { - let cipher = StoreCipher::new()?; - #[cfg(not(test))] - let export = cipher.export(passphrase)?; - #[cfg(test)] - let export = cipher._insecure_export_fast_for_testing(passphrase)?; - db.insert("store_cipher".encode(), export)?; - Some(cipher.into()) - } - } else { - None - }; - - let mut store = SledStateStore::open_helper(db, path, store_cipher)?; - - let migration_res = store.upgrade(); - if let Err(SledStoreError::MigrationConflict { path, .. }) = &migration_res { - // how are supposed to react about this? - match self.migration_conflict_strategy { - MigrationConflictStrategy::BackupAndDrop => { - let mut new_path = path.clone(); - new_path.set_extension(format!( - "{}.backup", - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("time doesn't go backwards") - .as_secs() - )); - fs_extra::dir::create_all(&new_path, false)?; - fs_extra::dir::copy(path, new_path, &fs_extra::dir::CopyOptions::new())?; - store.drop_v1_tables()?; - return self.build(); - } - MigrationConflictStrategy::Drop => { - store.drop_v1_tables()?; - return self.build(); - } - MigrationConflictStrategy::Raise => migration_res?, - } - } else { - migration_res?; - } - - Ok(store) - } - - // testing only - #[cfg(test)] - fn build_encrypted() -> StoreResult { - let db = Config::new().temporary(true).open().map_err(StoreError::backend)?; - - SledStateStore::open_helper( - db, - None, - Some(StoreCipher::new().expect("can't create store cipher").into()), - ) - .map_err(|e| e.into()) - } -} - -#[derive(Clone)] -pub struct SledStateStore { - path: Option, - pub(crate) inner: Db, - store_cipher: Option>, - kv: Tree, - account_data: Tree, - profiles: Tree, - display_names: Tree, - user_ids: Tree, - room_info: Tree, - room_state: Tree, - room_account_data: Tree, - stripped_user_ids: Tree, - stripped_room_infos: Tree, - stripped_room_state: Tree, - presence: Tree, - room_user_receipts: Tree, - room_event_receipts: Tree, - media: Tree, - custom: Tree, -} - -impl std::fmt::Debug for SledStateStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(path) = &self.path { - f.debug_struct("SledStateStore").field("path", &path).finish() - } else { - f.debug_struct("SledStateStore").field("path", &"memory store").finish() - } - } -} - -impl SledStateStore { - fn open_helper( - db: Db, - path: Option, - store_cipher: Option>, - ) -> Result { - let kv = db.open_tree(keys::KV)?; - let account_data = db.open_tree(keys::ACCOUNT_DATA)?; - - let profiles = db.open_tree(keys::PROFILE)?; - let display_names = db.open_tree(keys::DISPLAY_NAME)?; - let user_ids = db.open_tree(keys::USER_ID)?; - - let room_state = db.open_tree(keys::ROOM_STATE)?; - let room_info = db.open_tree(keys::ROOM_INFO)?; - let presence = db.open_tree(keys::PRESENCE)?; - let room_account_data = db.open_tree(keys::ROOM_ACCOUNT_DATA)?; - - let stripped_user_ids = db.open_tree(keys::STRIPPED_USER_ID)?; - let stripped_room_infos = db.open_tree(keys::STRIPPED_ROOM_INFO)?; - let stripped_room_state = db.open_tree(keys::STRIPPED_ROOM_STATE)?; - - let room_user_receipts = db.open_tree(keys::ROOM_USER_RECEIPT)?; - let room_event_receipts = db.open_tree(keys::ROOM_EVENT_RECEIPT)?; - - let media = db.open_tree(keys::MEDIA)?; - - let custom = db.open_tree(keys::CUSTOM)?; - - Ok(Self { - path, - inner: db, - store_cipher, - kv, - account_data, - profiles, - display_names, - user_ids, - room_account_data, - presence, - room_state, - room_info, - stripped_user_ids, - stripped_room_infos, - stripped_room_state, - room_user_receipts, - room_event_receipts, - media, - custom, - }) - } - - /// Create a [`SledStateStoreBuilder`] with default parameters. - pub fn builder() -> SledStateStoreBuilder { - SledStateStoreBuilder::new() - } - - /// Open a `SledCryptoStore` that uses the same database as this store. - /// - /// The given passphrase will be used to encrypt private data. - #[cfg(feature = "crypto-store")] - pub async fn open_crypto_store(&self) -> Result { - let db = self.inner.clone(); - let store_cipher = self.store_cipher.clone(); - SledCryptoStore::open_helper(db, None, store_cipher).await - } - - fn serialize_value(&self, event: &impl Serialize) -> Result, SledStoreError> { - if let Some(key) = &self.store_cipher { - Ok(key.encrypt_value(event)?) - } else { - Ok(serde_json::to_vec(event)?) - } - } - - fn deserialize_value(&self, event: &[u8]) -> Result { - if let Some(key) = &self.store_cipher { - Ok(key.decrypt_value(event)?) - } else { - Ok(serde_json::from_slice(event)?) - } - } - - fn encode_key(&self, table_name: &str, key: T) -> Vec { - if let Some(store_cipher) = &self.store_cipher { - key.encode_secure(table_name, store_cipher).to_vec() - } else { - key.encode() - } - } - - fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> Vec { - match key { - StateStoreDataKey::SyncToken => StateStoreDataKey::SYNC_TOKEN.encode(), - StateStoreDataKey::Filter(filter_name) => { - self.encode_key(keys::SESSION, (StateStoreDataKey::FILTER, filter_name)) - } - StateStoreDataKey::UserAvatarUrl(user_id) => { - self.encode_key(keys::SESSION, (StateStoreDataKey::USER_AVATAR_URL, user_id)) - } - } - } - - async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result> { - let encoded_key = self.encode_kv_data_key(key); - - let value = - self.kv.get(encoded_key)?.map(|e| self.deserialize_value::(&e)).transpose()?; - - let value = match key { - StateStoreDataKey::SyncToken => value.map(StateStoreDataValue::SyncToken), - StateStoreDataKey::Filter(_) => value.map(StateStoreDataValue::Filter), - StateStoreDataKey::UserAvatarUrl(_) => value.map(StateStoreDataValue::UserAvatarUrl), - }; - - Ok(value) - } - - async fn set_kv_data( - &self, - key: StateStoreDataKey<'_>, - value: StateStoreDataValue, - ) -> Result<()> { - let encoded_key = self.encode_kv_data_key(key); - - let value = match key { - StateStoreDataKey::SyncToken => { - value.into_sync_token().expect("Session data not a sync token") - } - StateStoreDataKey::Filter(_) => value.into_filter().expect("Session data not a filter"), - StateStoreDataKey::UserAvatarUrl(_) => { - value.into_user_avatar_url().expect("Session data not an user avatar url") - } - }; - - self.kv.insert(encoded_key, self.serialize_value(&value)?)?; - - Ok(()) - } - - async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> { - let encoded_key = self.encode_kv_data_key(key); - - self.kv.remove(encoded_key)?; - - Ok(()) - } - - pub async fn save_changes(&self, changes: &StateChanges) -> Result<()> { - let now = Instant::now(); - - // room state & memberships - let ret: Result<(), TransactionError> = ( - &self.profiles, - &self.display_names, - &self.user_ids, - &self.room_info, - &self.room_state, - &self.room_account_data, - &self.stripped_user_ids, - &self.stripped_room_infos, - &self.stripped_room_state, - ) - .transaction( - |( - profiles, - display_names, - user_ids, - rooms, - state, - room_account_data, - stripped_user_ids, - stripped_rooms, - stripped_state, - )| { - for (room_id, ambiguity_maps) in &changes.ambiguity_maps { - for (display_name, map) in ambiguity_maps { - display_names.insert( - self.encode_key(keys::DISPLAY_NAME, (room_id, display_name)), - self.serialize_value(&map) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - } - - for (room, events) in &changes.room_account_data { - for (event_type, event) in events { - room_account_data.insert( - self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type)), - self.serialize_value(&event) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - } - - for (room, event_types) in &changes.state { - let profile_changes = changes.profiles.get(room); - - for (event_type, events) in event_types { - for (state_key, raw_event) in events { - state.insert( - self.encode_key( - keys::ROOM_STATE, - (room, event_type, state_key), - ), - self.serialize_value(&raw_event) - .map_err(ConflictableTransactionError::Abort)?, - )?; - stripped_state.remove(self.encode_key( - keys::STRIPPED_ROOM_STATE, - (room, event_type, state_key), - ))?; - - if *event_type == StateEventType::RoomMember { - let event = - match raw_event.deserialize_as::() { - Ok(ev) => ev, - Err(e) => { - let event_id: Option = - raw_event.get_field("event_id").ok().flatten(); - debug!( - event_id, - "Failed to deserialize member event: {e}" - ); - continue; - } - }; - - let key = (room, state_key); - - stripped_user_ids - .remove(self.encode_key(keys::STRIPPED_USER_ID, key))?; - - user_ids.insert( - self.encode_key(keys::USER_ID, key), - self.serialize_value(&RoomMember::from(&event)) - .map_err(ConflictableTransactionError::Abort)?, - )?; - - if let Some(profile) = - profile_changes.and_then(|p| p.get(event.state_key())) - { - profiles.insert( - self.encode_key(keys::PROFILE, key), - self.serialize_value(&profile) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - } - } - } - } - - for (room_id, room_info) in &changes.room_infos { - rooms.insert( - self.encode_key(keys::ROOM, room_id), - self.serialize_value(room_info) - .map_err(ConflictableTransactionError::Abort)?, - )?; - stripped_rooms - .remove(self.encode_key(keys::STRIPPED_ROOM_INFO, room_id))?; - } - - for (room_id, info) in &changes.stripped_room_infos { - stripped_rooms.insert( - self.encode_key(keys::STRIPPED_ROOM_INFO, room_id), - self.serialize_value(&info) - .map_err(ConflictableTransactionError::Abort)?, - )?; - rooms.remove(self.encode_key(keys::ROOM, room_id))?; - } - - for (room, event_types) in &changes.stripped_state { - for (event_type, events) in event_types { - for (state_key, raw_event) in events { - stripped_state.insert( - self.encode_key( - keys::STRIPPED_ROOM_STATE, - (room, event_type.to_string(), state_key), - ), - self.serialize_value(&raw_event) - .map_err(ConflictableTransactionError::Abort)?, - )?; - - if *event_type == StateEventType::RoomMember { - let event = match raw_event - .deserialize_as::() - { - Ok(ev) => ev, - Err(e) => { - let event_id: Option = - raw_event.get_field("event_id").ok().flatten(); - debug!( - event_id, - "Failed to deserialize stripped member event: {e}" - ); - continue; - } - }; - - let key = (room, state_key); - stripped_user_ids.insert( - self.encode_key(keys::STRIPPED_USER_ID, key), - self.serialize_value(&RoomMember::from(&event)) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - } - } - } - - Ok(()) - }, - ); - - ret?; - - if !changes.redactions.is_empty() { - let mut redactions_found = false; - let mut redactions_batch = sled::Batch::default(); - - let make_room_version = |room_id| { - self.room_info - .get(self.encode_key(keys::ROOM, room_id)) - .ok() - .flatten() - .map(|r| self.deserialize_value::(&r)) - .transpose() - .ok() - .flatten() - .and_then(|info| info.room_version().cloned()) - .unwrap_or_else(|| { - warn!(?room_id, "Unable to find the room version, assume version 9"); - RoomVersionId::V9 - }) - }; - - for (room_id, redactions) in &changes.redactions { - let key_prefix = self.encode_key(keys::ROOM_STATE, room_id); - let mut room_version = None; - - // iterate through all saved state events and check whether they are among the - // redacted, if so apply redaction and save that to the batch - // applied all at once after. - for (key, evt) in self.room_state.scan_prefix(key_prefix).filter_map(|r| r.ok()) { - let raw_evt = self.deserialize_value::>(&evt)?; - if let Ok(Some(event_id)) = raw_evt.get_field::("event_id") { - if let Some(redaction) = redactions.get(&event_id) { - let redacted = redact( - raw_evt.deserialize_as::()?, - room_version.get_or_insert_with(|| make_room_version(room_id)), - Some(redaction.try_into()?), - ) - .map_err(StoreError::Redaction)?; - redactions_found = true; - redactions_batch.insert(key, self.serialize_value(&redacted)?); - } - } - } - } - - if redactions_found { - self.room_state.apply_batch(redactions_batch)?; - } - } - - let ret: Result<(), TransactionError> = - (&self.room_user_receipts, &self.room_event_receipts, &self.presence).transaction( - |(room_user_receipts, room_event_receipts, presence)| { - for (room, content) in &changes.receipts { - for (event_id, receipts) in &content.0 { - for (receipt_type, receipts) in receipts { - for (user_id, receipt) in receipts { - // Add the receipt to the room user receipts - let key = match receipt.thread.as_str() { - Some(thread_id) => self.encode_key( - keys::ROOM_USER_RECEIPT, - (room, receipt_type, thread_id, user_id), - ), - None => self.encode_key( - keys::ROOM_USER_RECEIPT, - (room, receipt_type, user_id), - ), - }; - if let Some(old) = room_user_receipts.insert( - key, - self.serialize_value(&(event_id, receipt)) - .map_err(ConflictableTransactionError::Abort)?, - )? { - // Remove the old receipt from the room event receipts - let (old_event, _): (OwnedEventId, Receipt) = self - .deserialize_value(&old) - .map_err(ConflictableTransactionError::Abort)?; - let key = match receipt.thread.as_str() { - Some(thread_id) => self.encode_key( - keys::ROOM_EVENT_RECEIPT, - (room, receipt_type, thread_id, old_event, user_id), - ), - None => self.encode_key( - keys::ROOM_EVENT_RECEIPT, - (room, receipt_type, old_event, user_id), - ), - }; - room_event_receipts.remove(key)?; - } - - // Add the receipt to the room event receipts - let key = match receipt.thread.as_str() { - Some(thread_id) => self.encode_key( - keys::ROOM_EVENT_RECEIPT, - (room, receipt_type, thread_id, event_id, user_id), - ), - None => self.encode_key( - keys::ROOM_EVENT_RECEIPT, - (room, receipt_type, event_id, user_id), - ), - }; - room_event_receipts.insert( - key, - self.serialize_value(&(user_id, receipt)) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - } - } - } - - for (sender, event) in &changes.presence { - presence.insert( - self.encode_key(keys::PRESENCE, sender), - self.serialize_value(&event) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - - Ok(()) - }, - ); - - ret?; - - // user state - let ret: Result<(), TransactionError> = (&self.kv, &self.account_data) - .transaction(|(kv, account_data)| { - if let Some(s) = &changes.sync_token { - kv.insert( - self.encode_kv_data_key(StateStoreDataKey::SyncToken), - self.serialize_value(s).map_err(ConflictableTransactionError::Abort)?, - )?; - } - - for (event_type, event) in &changes.account_data { - account_data.insert( - self.encode_key(keys::ACCOUNT_DATA, event_type), - self.serialize_value(&event) - .map_err(ConflictableTransactionError::Abort)?, - )?; - } - - Ok(()) - }); - - ret?; - - self.inner.flush_async().await?; - - info!("Saved changes in {:?}", now.elapsed()); - - Ok(()) - } - - pub async fn get_presence_event(&self, user_id: &UserId) -> Result>> { - let db = self.clone(); - let key = self.encode_key(keys::PRESENCE, user_id); - spawn_blocking(move || db.presence.get(key)?.map(|e| db.deserialize_value(&e)).transpose()) - .await? - } - - pub async fn get_state_event( - &self, - room_id: &RoomId, - event_type: StateEventType, - state_key: &str, - ) -> Result> { - let db = self.clone(); - let key = self.encode_key(keys::ROOM_STATE, (room_id, event_type.to_string(), state_key)); - let stripped_key = self - .encode_key(keys::STRIPPED_ROOM_STATE, (room_id, event_type.to_string(), state_key)); - spawn_blocking(move || { - if let Some(e) = db - .stripped_room_state - .get(stripped_key)? - .map(|v| db.deserialize_value(&v)) - .transpose()? - { - Ok(Some(RawAnySyncOrStrippedState::Stripped(e))) - } else if let Some(e) = - db.room_state.get(key)?.map(|v| db.deserialize_value(&v)).transpose()? - { - Ok(Some(RawAnySyncOrStrippedState::Sync(e))) - } else { - Ok(None) - } - }) - .await? - } - - pub async fn get_state_events( - &self, - room_id: &RoomId, - event_type: StateEventType, - ) -> Result> { - let db = self.clone(); - let key = self.encode_key(keys::ROOM_STATE, (room_id, event_type.to_string())); - let stripped_key = - self.encode_key(keys::STRIPPED_ROOM_STATE, (room_id, event_type.to_string())); - spawn_blocking(move || { - let stripped_events = db - .stripped_room_state - .scan_prefix(stripped_key) - .flat_map(|e| { - e.map(|(_, e)| { - db.deserialize_value(&e).map(RawAnySyncOrStrippedState::Stripped) - }) - }) - .collect::, _>>()?; - - if !stripped_events.is_empty() { - return Ok(stripped_events); - } - - db.room_state - .scan_prefix(key) - .flat_map(|e| { - e.map(|(_, e)| db.deserialize_value(&e).map(RawAnySyncOrStrippedState::Sync)) - }) - .collect::>() - }) - .await? - } - - pub async fn get_profile( - &self, - room_id: &RoomId, - user_id: &UserId, - ) -> Result>> { - let db = self.clone(); - let key = self.encode_key(keys::PROFILE, (room_id, user_id)); - spawn_blocking(move || db.profiles.get(key)?.map(|p| db.deserialize_value(&p)).transpose()) - .await? - } - - pub async fn get_member_event( - &self, - room_id: &RoomId, - state_key: &UserId, - ) -> Result> { - self.get_state_event(room_id, StateEventType::RoomMember, state_key.as_str()) - .await - .map(|opt| opt.map(|raw| raw.cast())) - } - - /// Get the user IDs for the given room with the given memberships and - /// stripped state. - pub async fn get_user_ids( - &self, - room_id: &RoomId, - memberships: RoomMemberships, - stripped: bool, - ) -> StoreResult> { - let db = self.clone(); - let store_name = if stripped { keys::STRIPPED_USER_ID } else { keys::USER_ID }; - let key = self.encode_key(store_name, room_id); - - spawn_blocking(move || { - let tree = if stripped { &db.stripped_user_ids } else { &db.user_ids }; - - tree.scan_prefix(key) - .map(move |u| { - let member = db - .deserialize_value::(&u.map_err(StoreError::backend)?.1) - .map_err(StoreError::backend)?; - - Ok(memberships.matches(&member.membership).then_some(member.user_id)) - }) - .filter_map(|u| u.transpose()) - .collect::>>() - }) - .await - .map_err(StoreError::backend)? - } - - pub async fn get_room_infos(&self) -> Result>> { - let db = self.clone(); - spawn_blocking(move || { - stream::iter(db.room_info.iter().map(move |r| db.deserialize_value(&r?.1))) - }) - .await - .map_err(Into::into) - } - - pub async fn get_stripped_room_infos(&self) -> Result>> { - let db = self.clone(); - spawn_blocking(move || { - stream::iter(db.stripped_room_infos.iter().map(move |r| db.deserialize_value(&r?.1))) - }) - .await - .map_err(Into::into) - } - - pub async fn get_users_with_display_name( - &self, - room_id: &RoomId, - display_name: &str, - ) -> Result> { - let db = self.clone(); - let key = self.encode_key(keys::DISPLAY_NAME, (room_id, display_name)); - spawn_blocking(move || { - Ok(db - .display_names - .get(key)? - .map(|m| db.deserialize_value(&m)) - .transpose()? - .unwrap_or_default()) - }) - .await? - } - - pub async fn get_account_data_event( - &self, - event_type: GlobalAccountDataEventType, - ) -> Result>> { - let db = self.clone(); - let key = self.encode_key(keys::ACCOUNT_DATA, event_type); - spawn_blocking(move || { - db.account_data.get(key)?.map(|m| db.deserialize_value(&m)).transpose() - }) - .await? - } - - pub async fn get_room_account_data_event( - &self, - room_id: &RoomId, - event_type: RoomAccountDataEventType, - ) -> Result>> { - let db = self.clone(); - let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)); - spawn_blocking(move || { - db.room_account_data.get(key)?.map(|m| db.deserialize_value(&m)).transpose() - }) - .await? - } - - async fn get_user_room_receipt_event( - &self, - room_id: &RoomId, - receipt_type: ReceiptType, - thread: ReceiptThread, - user_id: &UserId, - ) -> Result> { - let db = self.clone(); - let key = match thread.as_str() { - Some(thread_id) => self - .encode_key(keys::ROOM_USER_RECEIPT, (room_id, receipt_type, thread_id, user_id)), - None => self.encode_key(keys::ROOM_USER_RECEIPT, (room_id, receipt_type, user_id)), - }; - spawn_blocking(move || { - db.room_user_receipts.get(key)?.map(|m| db.deserialize_value(&m)).transpose() - }) - .await? - } - - async fn get_event_room_receipt_events( - &self, - room_id: &RoomId, - receipt_type: ReceiptType, - thread: ReceiptThread, - event_id: &EventId, - ) -> StoreResult> { - let db = self.clone(); - let key = match thread.as_str() { - Some(thread_id) => self - .encode_key(keys::ROOM_EVENT_RECEIPT, (room_id, receipt_type, thread_id, event_id)), - None => self.encode_key(keys::ROOM_EVENT_RECEIPT, (room_id, receipt_type, event_id)), - }; - spawn_blocking(move || { - db.room_event_receipts - .scan_prefix(key) - .values() - .map(|u| { - let v = u.map_err(StoreError::backend)?; - db.deserialize_value(&v).map_err(StoreError::backend) - }) - .collect() - }) - .await - .map_err(StoreError::backend)? - } - - async fn add_media_content(&self, request: &MediaRequest, data: Vec) -> Result<()> { - self.media.insert( - self.encode_key( - keys::MEDIA, - (request.source.unique_key(), request.format.unique_key()), - ), - self.serialize_value(&data)?, - )?; - - self.inner.flush_async().await?; - - Ok(()) - } - - async fn get_media_content(&self, request: &MediaRequest) -> Result>> { - let db = self.clone(); - let key = self - .encode_key(keys::MEDIA, (request.source.unique_key(), request.format.unique_key())); - - spawn_blocking(move || { - db.media.get(key)?.map(move |m| db.deserialize_value(&m)).transpose() - }) - .await? - } - - async fn get_custom_value(&self, key: &[u8]) -> Result>> { - let custom = self.custom.clone(); - let me = self.clone(); - let key = self.encode_key(keys::CUSTOM, EncodeUnchecked::from(key)); - spawn_blocking(move || custom.get(key)?.map(move |v| me.deserialize_value(&v)).transpose()) - .await? - } - - async fn set_custom_value(&self, key: &[u8], value: Vec) -> Result>> { - let key = self.encode_key(keys::CUSTOM, EncodeUnchecked::from(key)); - let me = self.clone(); - let ret = self - .custom - .insert(key, me.serialize_value(&value)?)? - .map(|v| me.deserialize_value(&v)) - .transpose(); - self.inner.flush_async().await?; - - ret - } - - async fn remove_custom_value(&self, key: &[u8]) -> Result>> { - let key = self.encode_key(keys::CUSTOM, EncodeUnchecked::from(key)); - let me = self.clone(); - let ret = self.custom.remove(key)?.map(|v| me.deserialize_value(&v)).transpose(); - self.inner.flush_async().await?; - - ret - } - - async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> { - self.media.remove( - self.encode_key( - keys::MEDIA, - (request.source.unique_key(), request.format.unique_key()), - ), - )?; - - Ok(()) - } - - async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { - let keys = self.media.scan_prefix(self.encode_key(keys::MEDIA, uri)).keys(); - - let mut batch = sled::Batch::default(); - for key in keys { - batch.remove(key?); - } - - Ok(self.media.apply_batch(batch)?) - } - - async fn remove_room(&self, room_id: &RoomId) -> Result<()> { - let mut profiles_batch = sled::Batch::default(); - for key in self.profiles.scan_prefix(self.encode_key(keys::PROFILE, room_id)).keys() { - profiles_batch.remove(key?); - } - - let mut display_names_batch = sled::Batch::default(); - for key in - self.display_names.scan_prefix(self.encode_key(keys::DISPLAY_NAME, room_id)).keys() - { - display_names_batch.remove(key?); - } - - let mut user_ids_batch = sled::Batch::default(); - for key in self.user_ids.scan_prefix(self.encode_key(keys::USER_ID, room_id)).keys() { - user_ids_batch.remove(key?); - } - - let mut stripped_user_ids_batch = sled::Batch::default(); - for key in self - .stripped_user_ids - .scan_prefix(self.encode_key(keys::STRIPPED_USER_ID, room_id)) - .keys() - { - stripped_user_ids_batch.remove(key?); - } - - let mut room_state_batch = sled::Batch::default(); - for key in self.room_state.scan_prefix(self.encode_key(keys::ROOM_STATE, room_id)).keys() { - room_state_batch.remove(key?); - } - - let mut stripped_room_state_batch = sled::Batch::default(); - for key in self - .stripped_room_state - .scan_prefix(self.encode_key(keys::STRIPPED_ROOM_STATE, room_id)) - .keys() - { - stripped_room_state_batch.remove(key?); - } - - let mut room_account_data_batch = sled::Batch::default(); - for key in self - .room_account_data - .scan_prefix(self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id)) - .keys() - { - room_account_data_batch.remove(key?); - } - - let ret: Result<(), TransactionError> = ( - &self.profiles, - &self.display_names, - &self.user_ids, - &self.stripped_user_ids, - &self.room_info, - &self.stripped_room_infos, - &self.room_state, - &self.stripped_room_state, - &self.room_account_data, - ) - .transaction( - |( - profiles, - display_names, - user_ids, - stripped_user_ids, - rooms, - stripped_rooms, - state, - stripped_state, - room_account_data, - )| { - rooms.remove(self.encode_key(keys::ROOM, room_id))?; - stripped_rooms.remove(self.encode_key(keys::STRIPPED_ROOM_INFO, room_id))?; - - profiles.apply_batch(&profiles_batch)?; - display_names.apply_batch(&display_names_batch)?; - user_ids.apply_batch(&user_ids_batch)?; - stripped_user_ids.apply_batch(&stripped_user_ids_batch)?; - state.apply_batch(&room_state_batch)?; - stripped_state.apply_batch(&stripped_room_state_batch)?; - room_account_data.apply_batch(&room_account_data_batch)?; - - Ok(()) - }, - ); - ret?; - - let mut room_user_receipts_batch = sled::Batch::default(); - for key in self - .room_user_receipts - .scan_prefix(self.encode_key(keys::ROOM_USER_RECEIPT, room_id)) - .keys() - { - room_user_receipts_batch.remove(key?); - } - - let mut room_event_receipts_batch = sled::Batch::default(); - for key in self - .room_event_receipts - .scan_prefix(self.encode_key(keys::ROOM_EVENT_RECEIPT, room_id)) - .keys() - { - room_event_receipts_batch.remove(key?); - } - - let ret: Result<(), TransactionError> = - (&self.room_user_receipts, &self.room_event_receipts).transaction( - |(room_user_receipts, room_event_receipts)| { - room_user_receipts.apply_batch(&room_user_receipts_batch)?; - room_event_receipts.apply_batch(&room_event_receipts_batch)?; - Ok(()) - }, - ); - ret?; - - self.inner.flush_async().await?; - - Ok(()) - } -} - -#[async_trait] -impl StateStore for SledStateStore { - type Error = StoreError; - - async fn get_kv_data( - &self, - key: StateStoreDataKey<'_>, - ) -> StoreResult> { - self.get_kv_data(key).await.map_err(Into::into) - } - - async fn set_kv_data( - &self, - key: StateStoreDataKey<'_>, - value: StateStoreDataValue, - ) -> StoreResult<()> { - self.set_kv_data(key, value).await.map_err(Into::into) - } - - async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> StoreResult<()> { - self.remove_kv_data(key).await.map_err(Into::into) - } - - async fn save_changes(&self, changes: &StateChanges) -> StoreResult<()> { - self.save_changes(changes).await.map_err(Into::into) - } - - async fn get_presence_event( - &self, - user_id: &UserId, - ) -> StoreResult>> { - self.get_presence_event(user_id).await.map_err(Into::into) - } - - async fn get_state_event( - &self, - room_id: &RoomId, - event_type: StateEventType, - state_key: &str, - ) -> StoreResult> { - self.get_state_event(room_id, event_type, state_key).await.map_err(Into::into) - } - - async fn get_state_events( - &self, - room_id: &RoomId, - event_type: StateEventType, - ) -> StoreResult> { - self.get_state_events(room_id, event_type).await.map_err(Into::into) - } - - async fn get_profile( - &self, - room_id: &RoomId, - user_id: &UserId, - ) -> StoreResult>> { - self.get_profile(room_id, user_id).await.map_err(Into::into) - } - - async fn get_member_event( - &self, - room_id: &RoomId, - state_key: &UserId, - ) -> StoreResult> { - self.get_member_event(room_id, state_key).await.map_err(Into::into) - } - - async fn get_user_ids( - &self, - room_id: &RoomId, - memberships: RoomMemberships, - ) -> StoreResult> { - let v = self.get_user_ids(room_id, memberships, true).await?; - if !v.is_empty() { - return Ok(v); - } - self.get_user_ids(room_id, memberships, false).await - } - - async fn get_invited_user_ids(&self, room_id: &RoomId) -> StoreResult> { - StateStore::get_user_ids(self, room_id, RoomMemberships::INVITE).await - } - - async fn get_joined_user_ids(&self, room_id: &RoomId) -> StoreResult> { - StateStore::get_user_ids(self, room_id, RoomMemberships::JOIN).await - } - - async fn get_room_infos(&self) -> StoreResult> { - self.get_room_infos() - .await - .map_err::(Into::into)? - .try_collect() - .await - .map_err::(Into::into) - } - - async fn get_stripped_room_infos(&self) -> StoreResult> { - self.get_stripped_room_infos() - .await - .map_err::(Into::into)? - .try_collect() - .await - .map_err::(Into::into) - } - - async fn get_users_with_display_name( - &self, - room_id: &RoomId, - display_name: &str, - ) -> StoreResult> { - self.get_users_with_display_name(room_id, display_name).await.map_err(Into::into) - } - - async fn get_account_data_event( - &self, - event_type: GlobalAccountDataEventType, - ) -> StoreResult>> { - self.get_account_data_event(event_type).await.map_err(Into::into) - } - - async fn get_room_account_data_event( - &self, - room_id: &RoomId, - event_type: RoomAccountDataEventType, - ) -> StoreResult>> { - self.get_room_account_data_event(room_id, event_type).await.map_err(Into::into) - } - - async fn get_user_room_receipt_event( - &self, - room_id: &RoomId, - receipt_type: ReceiptType, - thread: ReceiptThread, - user_id: &UserId, - ) -> StoreResult> { - self.get_user_room_receipt_event(room_id, receipt_type, thread, user_id) - .await - .map_err(Into::into) - } - - async fn get_event_room_receipt_events( - &self, - room_id: &RoomId, - receipt_type: ReceiptType, - thread: ReceiptThread, - event_id: &EventId, - ) -> StoreResult> { - self.get_event_room_receipt_events(room_id, receipt_type, thread, event_id) - .await - .map_err(Into::into) - } - - async fn get_custom_value(&self, key: &[u8]) -> StoreResult>> { - self.get_custom_value(key).await.map_err(Into::into) - } - - async fn set_custom_value(&self, key: &[u8], value: Vec) -> StoreResult>> { - self.set_custom_value(key, value).await.map_err(Into::into) - } - - async fn remove_custom_value(&self, key: &[u8]) -> StoreResult>> { - self.remove_custom_value(key).await.map_err(Into::into) - } - - async fn add_media_content(&self, request: &MediaRequest, data: Vec) -> StoreResult<()> { - self.add_media_content(request, data).await.map_err(Into::into) - } - - async fn get_media_content(&self, request: &MediaRequest) -> StoreResult>> { - self.get_media_content(request).await.map_err(Into::into) - } - - async fn remove_media_content(&self, request: &MediaRequest) -> StoreResult<()> { - self.remove_media_content(request).await.map_err(Into::into) - } - - async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> StoreResult<()> { - self.remove_media_content_for_uri(uri).await.map_err(Into::into) - } - - async fn remove_room(&self, room_id: &RoomId) -> StoreResult<()> { - self.remove_room(room_id).await.map_err(Into::into) - } -} - -/// A room member. -#[derive(Debug, Serialize, Deserialize)] -struct RoomMember { - user_id: OwnedUserId, - membership: MembershipState, -} - -impl From<&SyncStateEvent> for RoomMember { - fn from(event: &SyncStateEvent) -> Self { - Self { user_id: event.state_key().clone(), membership: event.membership().clone() } - } -} - -impl From<&StrippedRoomMemberEvent> for RoomMember { - fn from(event: &StrippedRoomMemberEvent) -> Self { - Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() } - } -} - -#[cfg(test)] -mod tests { - use matrix_sdk_base::statestore_integration_tests; - - use super::{SledStateStore, StateStore, StoreResult}; - - async fn get_store() -> StoreResult { - SledStateStore::builder().build().map_err(Into::into) - } - - statestore_integration_tests!(with_media_tests); -} - -#[cfg(test)] -mod encrypted_tests { - use matrix_sdk_base::statestore_integration_tests; - - use super::{SledStateStoreBuilder, StateStore, StoreResult}; - - async fn get_store() -> StoreResult { - SledStateStoreBuilder::build_encrypted().map_err(Into::into) - } - - statestore_integration_tests!(with_media_tests); -} diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 4d807ff4a..4c76f3d14 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -9,7 +9,7 @@ results by any membership state. - `Common::active_members(_no_sync)` and `Common::joined_members(_no_sync)` are deprecated. - `matrix-sdk-sqlite` is the new default store implementation outside of WASM, behind the `sqlite` feature. - - The `sled` feature was removed. It is still possible to use `matrix-sdk-sled` as a custom store. + - The `sled` feature was removed. The `matrix-sdk-sled` crate is deprecated and no longer maintained. - The `Common` methods to retrieve state events can now return a sync or stripped event, so it can be used for invited rooms too.