diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 91c8873cc..2ea3448b6 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -358,6 +358,28 @@ macro_rules! statestore_integration_tests { assert!(!members.is_empty(), "We expected to find members for the room") } + #[async_test] + async fn test_filter_saving() { + let store = get_store().await.unwrap(); + let test_name = "filter_name"; + let filter_id = "filter_id_1234"; + assert_eq!(store.get_filter(test_name).await.unwrap(), None); + store.save_filter(test_name, filter_id).await.unwrap(); + assert_eq!(store.get_filter(test_name).await.unwrap(), Some(filter_id.to_owned())); + } + + #[async_test] + async fn test_sync_token_saving() { + let mut changes = StateChanges::default(); + let store = get_store().await.unwrap(); + let sync_token = "t392-516_47314_0_7_1".to_owned(); + + changes.sync_token = Some(sync_token.clone()); + assert_eq!(store.get_sync_token().await.unwrap(), None); + store.save_changes(&changes).await.unwrap(); + assert_eq!(store.get_sync_token().await.unwrap(), Some(sync_token)); + } + #[async_test] async fn test_stripped_member_saving() { let store = get_store().await.unwrap(); @@ -625,7 +647,7 @@ macro_rules! statestore_integration_tests { // Add sync response let sync = SyncResponse::try_from_http_response( - Response::builder().body(serde_json::to_vec(&*test_json::MORE_SYNC).unwrap()).unwrap(), + Response::builder().body(serde_json::to_vec(&*test_json::MORE_SYNC).expect("Parsing MORE_SYNC failed")).unwrap(), ) .unwrap(); @@ -643,7 +665,7 @@ macro_rules! statestore_integration_tests { ); let mut changes = StateChanges::new(sync.next_batch.clone()); changes.add_timeline(room_id, timeline_slice); - store.save_changes(&changes).await.unwrap(); + store.save_changes(&changes).await.expect("Saving room timeline failed"); check_timeline_events(room_id, &store, &stored_events, timeline.prev_batch.as_deref()) .await; @@ -651,7 +673,7 @@ macro_rules! statestore_integration_tests { // Add message response let messages = MessageResponse::try_from_http_response( Response::builder() - .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_1).unwrap()) + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_1).expect("Parsing SYNC_ROOM_MESSAGES_BATCH_1 failed")) .unwrap(), ) .unwrap(); @@ -669,14 +691,14 @@ macro_rules! statestore_integration_tests { TimelineSlice::new(events, messages.start.clone(), messages.end.clone(), false, false); let mut changes = StateChanges::default(); changes.add_timeline(room_id, timeline_slice); - store.save_changes(&changes).await.unwrap(); + store.save_changes(&changes).await.expect("Saving room update timeline failed"); check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await; // Add second message response let messages = MessageResponse::try_from_http_response( Response::builder() - .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_2).unwrap()) + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_2).expect("Parsing SYNC_ROOM_MESSAGES_BATCH_2 failed")) .unwrap(), ) .unwrap(); @@ -694,7 +716,7 @@ macro_rules! statestore_integration_tests { TimelineSlice::new(events, messages.start.clone(), messages.end.clone(), false, false); let mut changes = StateChanges::default(); changes.add_timeline(room_id, timeline_slice); - store.save_changes(&changes).await.unwrap(); + store.save_changes(&changes).await.expect("Saving room update timeline 2 failed"); check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await; @@ -722,7 +744,7 @@ macro_rules! statestore_integration_tests { ); let mut changes = StateChanges::new(sync.next_batch.clone()); changes.add_timeline(room_id, timeline_slice); - store.save_changes(&changes).await.unwrap(); + store.save_changes(&changes).await.expect("Saving room update timeline 3 failed"); check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await; @@ -737,7 +759,7 @@ macro_rules! statestore_integration_tests { ); let mut changes = StateChanges::default(); changes.add_timeline(room_id, timeline_slice); - store.save_changes(&changes).await.unwrap(); + store.save_changes(&changes).await.expect("Saving room update timeline 4 failed"); check_timeline_events(room_id, &store, &Vec::new(), end_token.as_deref()).await; } diff --git a/crates/matrix-sdk-indexeddb/src/state_store.rs b/crates/matrix-sdk-indexeddb/src/state_store.rs index b992a67d0..ff0c2089d 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store.rs @@ -330,7 +330,7 @@ impl IndexeddbStore { obj.put_key_val( &self.encode_key(KEYS::FILTER, (KEYS::FILTER, filter_name)), - &JsValue::from_str(filter_id), + &self.serialize_event(&filter_id)?, )?; tx.await.into_result()?; @@ -339,13 +339,13 @@ impl IndexeddbStore { } pub async fn get_filter(&self, filter_name: &str) -> Result> { - Ok(self - .inner + self.inner .transaction_on_one_with_mode(KEYS::SESSION, IdbTransactionMode::Readonly)? .object_store(KEYS::SESSION)? .get(&self.encode_key(KEYS::FILTER, (KEYS::FILTER, filter_name)))? .await? - .and_then(|f| f.as_string())) + .map(|f| self.deserialize_event(f)) + .transpose() } pub async fn get_sync_token(&self) -> Result> { @@ -675,7 +675,7 @@ impl IndexeddbStore { let metadata: Option = timeline_metadata_store .get(&self.encode_key(KEYS::ROOM_TIMELINE_METADATA, room_id))? .await? - .map(|v| v.into_serde()) + .map(|v| self.deserialize_event(&v)) .transpose()?; if let Some(mut metadata) = metadata { if !timeline.sync && Some(&timeline.start) != metadata.end.as_ref() { @@ -834,7 +834,7 @@ impl IndexeddbStore { timeline_metadata_store.put_key_val_owned( &self.encode_key(KEYS::ROOM_TIMELINE_METADATA, room_id), - &JsValue::from_serde(&metadata)?, + &self.serialize_event(&metadata)?, )?; } } @@ -1474,11 +1474,13 @@ mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); use matrix_sdk_base::statestore_integration_tests; + use uuid::Uuid; use super::{IndexeddbStore, Result}; async fn get_store() -> Result { - Ok(IndexeddbStore::open().await?) + let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated().to_string()); + Ok(IndexeddbStore::open_helper(db_name, None).await?) } statestore_integration_tests! { integration } diff --git a/crates/matrix-sdk-sled/src/encode_key.rs b/crates/matrix-sdk-sled/src/encode_key.rs index 3b2a85cc3..b36738713 100644 --- a/crates/matrix-sdk-sled/src/encode_key.rs +++ b/crates/matrix-sdk-sled/src/encode_key.rs @@ -11,6 +11,23 @@ use ruma::{ RoomId, TransactionId, UserId, }; +/// Hold any data to be used as an encoding key +/// without checking for the existence of `ENCODE_SEPARATOR` within +pub struct EncodeUnchecked<'a>(&'a [u8]); + +impl<'a> EncodeUnchecked<'a> { + /// Wrap any `[u8]` + pub fn from(bytes: &'a [u8]) -> Self { + EncodeUnchecked(bytes) + } +} + +impl<'a> EncodeKey for EncodeUnchecked<'a> { + fn encode_as_bytes(&self) -> Cow<'a, [u8]> { + (self.0).into() + } +} + pub const ENCODE_SEPARATOR: u8 = 0xff; pub trait EncodeKey { diff --git a/crates/matrix-sdk-sled/src/state_store.rs b/crates/matrix-sdk-sled/src/state_store.rs index d5e4c70a0..101642eb9 100644 --- a/crates/matrix-sdk-sled/src/state_store.rs +++ b/crates/matrix-sdk-sled/src/state_store.rs @@ -62,9 +62,9 @@ use tokio::task::spawn_blocking; #[cfg(feature = "crypto-store")] use super::OpenStoreError; -use crate::encode_key::EncodeKey; #[cfg(feature = "experimental-timeline")] use crate::encode_key::ENCODE_SEPARATOR; +use crate::encode_key::{EncodeKey, EncodeUnchecked}; #[cfg(feature = "crypto-store")] pub use crate::CryptoStore; @@ -123,6 +123,7 @@ const VERSION_KEY: &str = "state-store-version"; const ACCOUNT_DATA: &str = "account-data"; const CUSTOM: &str = "custom"; +const SYNC_TOKEN: &str = "sync_token"; const DISPLAY_NAME: &str = "display-name"; const INVITED_USER_ID: &str = "invited-user-id"; const JOINED_USER_ID: &str = "joined-user-id"; @@ -362,7 +363,7 @@ impl SledStore { CryptoStore::open_with_database(self.inner.clone(), self.store_cipher.clone()) } - fn serialize_event(&self, event: &impl Serialize) -> Result, SledStoreError> { + fn serialize_value(&self, event: &impl Serialize) -> Result, SledStoreError> { if let Some(key) = &self.store_cipher { Ok(key.encrypt_value(event)?) } else { @@ -370,7 +371,7 @@ impl SledStore { } } - fn deserialize_event(&self, event: &[u8]) -> Result { + fn deserialize_value(&self, event: &[u8]) -> Result { if let Some(key) = &self.store_cipher { Ok(key.decrypt_value(event)?) } else { @@ -398,22 +399,22 @@ impl SledStore { } pub async fn save_filter(&self, filter_name: &str, filter_id: &str) -> Result<()> { - self.session.insert(self.encode_key(SESSION, ("filter", filter_name)), filter_id)?; + self.session.insert( + self.encode_key(SESSION, ("filter", filter_name)), + self.serialize_value(&filter_id)?, + )?; Ok(()) } pub async fn get_filter(&self, filter_name: &str) -> Result> { - Ok(self - .session + self.session .get(self.encode_key(SESSION, ("filter", filter_name)))? - .map(|f| String::from_utf8_lossy(&f).to_string())) + .map(|f| self.deserialize_value(&f)) + .transpose() } pub async fn get_sync_token(&self) -> Result> { - Ok(self - .session - .get("sync_token".encode())? - .map(|t| String::from_utf8_lossy(&t).to_string())) + self.session.get(SYNC_TOKEN.encode())?.map(|t| self.deserialize_value(&t)).transpose() } pub async fn save_changes(&self, changes: &StateChanges) -> Result<()> { @@ -461,14 +462,16 @@ impl SledStore { MembershipState::Join => { joined.insert( self.encode_key(JOINED_USER_ID, &key), - event.state_key().as_str(), + self.serialize_value(event.state_key()) + .map_err(ConflictableTransactionError::Abort)?, )?; invited.remove(self.encode_key(INVITED_USER_ID, &key))?; } MembershipState::Invite => { invited.insert( self.encode_key(INVITED_USER_ID, &key), - event.state_key().as_str(), + self.serialize_value(event.state_key()) + .map_err(ConflictableTransactionError::Abort)?, )?; joined.remove(self.encode_key(JOINED_USER_ID, &key))?; } @@ -480,7 +483,7 @@ impl SledStore { members.insert( self.encode_key(MEMBER, &key), - self.serialize_event(&event) + self.serialize_value(&event) .map_err(ConflictableTransactionError::Abort)?, )?; @@ -489,7 +492,7 @@ impl SledStore { { profiles.insert( self.encode_key(PROFILE, &key), - self.serialize_event(&profile) + self.serialize_value(&profile) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -500,7 +503,7 @@ impl SledStore { for (display_name, map) in ambiguity_maps { display_names.insert( self.encode_key(DISPLAY_NAME, (room_id, display_name)), - self.serialize_event(&map) + self.serialize_value(&map) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -510,7 +513,7 @@ impl SledStore { for (event_type, event) in events { room_account_data.insert( self.encode_key(ROOM_ACCOUNT_DATA, &(room, event_type)), - self.serialize_event(&event) + self.serialize_value(&event) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -521,7 +524,7 @@ impl SledStore { for (state_key, event) in events { state.insert( self.encode_key(ROOM_STATE, (room, event_type, state_key)), - self.serialize_event(&event) + self.serialize_value(&event) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -531,7 +534,7 @@ impl SledStore { for (room_id, room_info) in &changes.room_infos { rooms.insert( self.encode_key(ROOM, room_id), - self.serialize_event(room_info) + self.serialize_value(room_info) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -539,7 +542,7 @@ impl SledStore { for (room_id, info) in &changes.stripped_room_infos { striped_rooms.insert( self.encode_key(STRIPPED_ROOM_INFO, room_id), - self.serialize_event(&info) + self.serialize_value(&info) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -552,7 +555,8 @@ impl SledStore { MembershipState::Join => { stripped_joined.insert( self.encode_key(STRIPPED_JOINED_USER_ID, &key), - event.state_key.as_str(), + self.serialize_value(&event.state_key) + .map_err(ConflictableTransactionError::Abort)?, )?; stripped_invited .remove(self.encode_key(STRIPPED_INVITED_USER_ID, &key))?; @@ -560,7 +564,8 @@ impl SledStore { MembershipState::Invite => { stripped_invited.insert( self.encode_key(STRIPPED_INVITED_USER_ID, &key), - event.state_key.as_str(), + self.serialize_value(&event.state_key) + .map_err(ConflictableTransactionError::Abort)?, )?; stripped_joined .remove(self.encode_key(STRIPPED_JOINED_USER_ID, &key))?; @@ -574,7 +579,7 @@ impl SledStore { } stripped_members.insert( self.encode_key(STRIPPED_ROOM_MEMBER, &key), - self.serialize_event(&event) + self.serialize_value(&event) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -588,7 +593,7 @@ impl SledStore { STRIPPED_ROOM_STATE, (room, event_type.to_string(), state_key), ), - self.serialize_event(&event) + self.serialize_value(&event) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -614,12 +619,12 @@ impl SledStore { ROOM_USER_RECEIPT, (room, receipt_type, user_id), ), - self.serialize_event(&(event_id, receipt)) + self.serialize_value(&(event_id, receipt)) .map_err(ConflictableTransactionError::Abort)?, )? { // Remove the old receipt from the room event receipts let (old_event, _): (OwnedEventId, Receipt) = self - .deserialize_event(&old) + .deserialize_value(&old) .map_err(ConflictableTransactionError::Abort)?; room_event_receipts.remove(self.encode_key( ROOM_EVENT_RECEIPT, @@ -633,7 +638,7 @@ impl SledStore { ROOM_EVENT_RECEIPT, (room, receipt_type, event_id, user_id), ), - self.serialize_event(&(user_id, receipt)) + self.serialize_value(&(user_id, receipt)) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -644,7 +649,7 @@ impl SledStore { for (sender, event) in &changes.presence { presence.insert( self.encode_key(PRESENCE, sender), - self.serialize_event(&event) + self.serialize_value(&event) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -662,13 +667,16 @@ impl SledStore { let ret: Result<(), TransactionError> = (&self.session, &self.account_data) .transaction(|(session, account_data)| { if let Some(s) = &changes.sync_token { - session.insert("sync_token".encode(), s.as_str())?; + session.insert( + SYNC_TOKEN.encode(), + self.serialize_value(s).map_err(ConflictableTransactionError::Abort)?, + )?; } for (event_type, event) in &changes.account_data { account_data.insert( self.encode_key(ACCOUNT_DATA, event_type), - self.serialize_event(&event) + self.serialize_value(&event) .map_err(ConflictableTransactionError::Abort)?, )?; } @@ -688,7 +696,7 @@ impl SledStore { pub async fn get_presence_event(&self, user_id: &UserId) -> Result>> { let db = self.clone(); let key = self.encode_key(PRESENCE, user_id); - spawn_blocking(move || db.presence.get(key)?.map(|e| db.deserialize_event(&e)).transpose()) + spawn_blocking(move || db.presence.get(key)?.map(|e| db.deserialize_value(&e)).transpose()) .await? } @@ -701,7 +709,7 @@ impl SledStore { let db = self.clone(); let key = self.encode_key(ROOM_STATE, (room_id, event_type.to_string(), state_key)); spawn_blocking(move || { - db.room_state.get(key)?.map(|e| db.deserialize_event(&e)).transpose() + db.room_state.get(key)?.map(|e| db.deserialize_value(&e)).transpose() }) .await? } @@ -716,7 +724,7 @@ impl SledStore { spawn_blocking(move || { db.room_state .scan_prefix(key) - .flat_map(|e| e.map(|(_, e)| db.deserialize_event(&e))) + .flat_map(|e| e.map(|(_, e)| db.deserialize_value(&e))) .collect::>() }) .await? @@ -729,7 +737,7 @@ impl SledStore { ) -> Result>> { let db = self.clone(); let key = self.encode_key(PROFILE, (room_id, user_id)); - spawn_blocking(move || db.profiles.get(key)?.map(|p| db.deserialize_event(&p)).transpose()) + spawn_blocking(move || db.profiles.get(key)?.map(|p| db.deserialize_value(&p)).transpose()) .await? } @@ -742,12 +750,12 @@ impl SledStore { let key = self.encode_key(MEMBER, (room_id, state_key)); let stripped_key = self.encode_key(STRIPPED_ROOM_MEMBER, (room_id, state_key)); spawn_blocking(move || { - if let Some(e) = db.members.get(key)?.map(|v| db.deserialize_event(&v)).transpose()? { + if let Some(e) = db.members.get(key)?.map(|v| db.deserialize_value(&v)).transpose()? { Ok(Some(MemberEvent::Sync(e))) } else if let Some(e) = db .stripped_members .get(stripped_key)? - .map(|v| db.deserialize_event(&v)) + .map(|v| db.deserialize_value(&v)) .transpose()? { Ok(Some(MemberEvent::Stripped(e))) @@ -784,9 +792,9 @@ impl SledStore { let db = self.clone(); let key = self.encode_key(INVITED_USER_ID, room_id); spawn_blocking(move || { - stream::iter(db.invited_user_ids.scan_prefix(key).map(|u| { - UserId::parse(String::from_utf8_lossy(&u.map_err(StoreError::backend)?.1)) - .map_err(StoreError::Identifier) + stream::iter(db.invited_user_ids.scan_prefix(key).map(move |u| { + db.deserialize_value(&u.map_err(StoreError::backend)?.1) + .map_err(StoreError::backend) })) }) .await @@ -800,9 +808,9 @@ impl SledStore { let db = self.clone(); let key = self.encode_key(JOINED_USER_ID, room_id); spawn_blocking(move || { - stream::iter(db.joined_user_ids.scan_prefix(key).map(|u| { - UserId::parse(String::from_utf8_lossy(&u.map_err(StoreError::backend)?.1)) - .map_err(StoreError::Identifier) + stream::iter(db.joined_user_ids.scan_prefix(key).map(move |u| { + db.deserialize_value(&u.map_err(StoreError::backend)?.1) + .map_err(StoreError::backend) })) }) .await @@ -816,9 +824,9 @@ impl SledStore { let db = self.clone(); let key = self.encode_key(STRIPPED_INVITED_USER_ID, room_id); spawn_blocking(move || { - stream::iter(db.stripped_invited_user_ids.scan_prefix(key).map(|u| { - UserId::parse(String::from_utf8_lossy(&u.map_err(StoreError::backend)?.1)) - .map_err(StoreError::Identifier) + stream::iter(db.stripped_invited_user_ids.scan_prefix(key).map(move |u| { + db.deserialize_value(&u.map_err(StoreError::backend)?.1) + .map_err(StoreError::backend) })) }) .await @@ -832,9 +840,9 @@ impl SledStore { let db = self.clone(); let key = self.encode_key(STRIPPED_JOINED_USER_ID, room_id); spawn_blocking(move || { - stream::iter(db.stripped_joined_user_ids.scan_prefix(key).map(|u| { - UserId::parse(String::from_utf8_lossy(&u.map_err(StoreError::backend)?.1)) - .map_err(StoreError::Identifier) + stream::iter(db.stripped_joined_user_ids.scan_prefix(key).map(move |u| { + db.deserialize_value(&u.map_err(StoreError::backend)?.1) + .map_err(StoreError::backend) })) }) .await @@ -844,7 +852,7 @@ impl SledStore { pub async fn get_room_infos(&self) -> Result>> { let db = self.clone(); spawn_blocking(move || { - stream::iter(db.room_info.iter().map(move |r| db.deserialize_event(&r?.1))) + stream::iter(db.room_info.iter().map(move |r| db.deserialize_value(&r?.1))) }) .await .map_err(Into::into) @@ -853,7 +861,7 @@ impl SledStore { pub async fn get_stripped_room_infos(&self) -> Result>> { let db = self.clone(); spawn_blocking(move || { - stream::iter(db.stripped_room_infos.iter().map(move |r| db.deserialize_event(&r?.1))) + stream::iter(db.stripped_room_infos.iter().map(move |r| db.deserialize_value(&r?.1))) }) .await .map_err(Into::into) @@ -870,7 +878,7 @@ impl SledStore { Ok(db .display_names .get(key)? - .map(|m| db.deserialize_event(&m)) + .map(|m| db.deserialize_value(&m)) .transpose()? .unwrap_or_default()) }) @@ -884,7 +892,7 @@ impl SledStore { let db = self.clone(); let key = self.encode_key(ACCOUNT_DATA, event_type); spawn_blocking(move || { - db.account_data.get(key)?.map(|m| db.deserialize_event(&m)).transpose() + db.account_data.get(key)?.map(|m| db.deserialize_value(&m)).transpose() }) .await? } @@ -897,7 +905,7 @@ impl SledStore { let db = self.clone(); let key = self.encode_key(ROOM_ACCOUNT_DATA, (room_id, event_type)); spawn_blocking(move || { - db.room_account_data.get(key)?.map(|m| db.deserialize_event(&m)).transpose() + db.room_account_data.get(key)?.map(|m| db.deserialize_value(&m)).transpose() }) .await? } @@ -911,7 +919,7 @@ impl SledStore { let db = self.clone(); let key = self.encode_key(ROOM_USER_RECEIPT, (room_id, receipt_type, user_id)); spawn_blocking(move || { - db.room_user_receipts.get(key)?.map(|m| db.deserialize_event(&m)).transpose() + db.room_user_receipts.get(key)?.map(|m| db.deserialize_value(&m)).transpose() }) .await? } @@ -930,7 +938,7 @@ impl SledStore { .values() .map(|u| { let v = u.map_err(StoreError::backend)?; - db.deserialize_event(&v).map_err(StoreError::backend) + db.deserialize_value(&v).map_err(StoreError::backend) }) .collect() }) @@ -941,7 +949,7 @@ impl SledStore { async fn add_media_content(&self, request: &MediaRequest, data: Vec) -> Result<()> { self.media.insert( self.encode_key(MEDIA, (request.source.unique_key(), request.format.unique_key())), - data, + self.serialize_value(&data)?, )?; self.inner.flush_async().await?; @@ -954,20 +962,31 @@ impl SledStore { let key = self.encode_key(MEDIA, (request.source.unique_key(), request.format.unique_key())); - spawn_blocking(move || Ok(db.media.get(key)?.map(|m| m.to_vec()))).await? + spawn_blocking(move || { + db.media.get(key)?.map(move |m| db.deserialize_value(&m)).transpose() + }) + .await? } async fn get_custom_value(&self, key: &[u8]) -> Result>> { let custom = self.custom.clone(); - let key = key.to_owned(); - spawn_blocking(move || Ok(custom.get(key)?.map(|v| v.to_vec()))).await? + let me = self.clone(); + let key = self.encode_key(CUSTOM, EncodeUnchecked::from(key)); + spawn_blocking(move || custom.get(key)?.map(move |v| me.deserialize_value(&v)).transpose()) + .await? } async fn set_custom_value(&self, key: &[u8], value: Vec) -> Result>> { - let ret = self.custom.insert(key, value)?.map(|v| v.to_vec()); + let key = self.encode_key(CUSTOM, EncodeUnchecked::from(key)); + let me = self.clone(); + let ret = self + .custom + .insert(key, me.serialize_value(&value)?)? + .map(|v| me.deserialize_value(&v)) + .transpose(); self.inner.flush_async().await?; - Ok(ret) + ret } async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> { @@ -1133,7 +1152,7 @@ impl SledStore { let metadata: Option = db .room_timeline_metadata .get(key.as_slice())? - .map(|v| serde_json::from_slice(&v).map_err(StoreError::Json)) + .map(|v| self.deserialize_value(&v)) .transpose()?; let metadata = match metadata { Some(m) => m, @@ -1155,7 +1174,7 @@ impl SledStore { let stream = stream! { while let Ok(Some(item)) = db.room_timeline.get(&db.encode_key_with_counter(TIMELINE, &r_id, position)) { position += 1; - yield db.deserialize_event(&item).map_err(SledStoreError::from).map_err(|e| e.into()); + yield db.deserialize_value(&item).map_err(SledStoreError::from).map_err(|e| e.into()); } }; @@ -1223,7 +1242,7 @@ impl SledStore { let metadata: Option = self .room_timeline_metadata .get(self.encode_key(TIMELINE_METADATA, &room_id))? - .map(|v| serde_json::from_slice(&v).map_err(StoreError::Json)) + .map(|item| self.deserialize_value(&item)) .transpose()?; if let Some(mut metadata) = metadata { if !timeline.sync && Some(&timeline.start) != metadata.end.as_ref() { @@ -1279,7 +1298,7 @@ impl SledStore { let room_version = self .room_info .get(&self.encode_key(ROOM_INFO, room_id))? - .map(|r| self.deserialize_event::(&r)) + .map(|r| self.deserialize_value::(&r)) .transpose()? .and_then(|info| info.room_version().cloned()) .unwrap_or_else(|| { @@ -1308,7 +1327,7 @@ impl SledStore { .room_timeline .get(position_key.as_ref())? .map(|e| { - self.deserialize_event::(&e) + self.deserialize_value::(&e) .map_err(SledStoreError::from) }) .transpose()? @@ -1319,7 +1338,7 @@ impl SledStore { .map_err(StoreError::Redaction)?; full_event.event = Raw::new(&event_json)?.cast(); timeline_batch - .insert(position_key, self.serialize_event(&full_event)?); + .insert(position_key, self.serialize_value(&full_event)?); } } } @@ -1327,7 +1346,7 @@ impl SledStore { metadata.start_position -= 1; let key = self.encode_key_with_counter(TIMELINE, room_id, metadata.start_position); - timeline_batch.insert(key.as_slice(), self.serialize_event(&event)?); + timeline_batch.insert(key.as_slice(), self.serialize_value(&event)?); // Only add event with id to the position map if let Some(event_id) = event.event_id() { let event_key = @@ -1340,7 +1359,7 @@ impl SledStore { metadata.end_position += 1; let key = self.encode_key_with_counter(TIMELINE, room_id, metadata.end_position); - timeline_batch.insert(key.as_slice(), self.serialize_event(&event)?); + timeline_batch.insert(key.as_slice(), self.serialize_value(&event)?); // Only add event with id to the position map if let Some(event_id) = event.event_id() { let event_key = @@ -1352,7 +1371,7 @@ impl SledStore { timeline_metadata_batch.insert( self.encode_key(TIMELINE_METADATA, &room_id), - serde_json::to_vec(&metadata)?, + self.serialize_value(&metadata)?, ); }