Merge pull request #729 from gnunicorn/gnunicorn/issue609

Ensure all data state stores save is encrypted
This commit is contained in:
Benjamin Kampmann
2022-06-07 12:41:38 +02:00
committed by GitHub
4 changed files with 145 additions and 85 deletions

View File

@@ -358,6 +358,28 @@ macro_rules! statestore_integration_tests {
assert!(!members.is_empty(), "We expected to find members for the room")
}
#[async_test]
async fn test_filter_saving() {
let store = get_store().await.unwrap();
let test_name = "filter_name";
let filter_id = "filter_id_1234";
assert_eq!(store.get_filter(test_name).await.unwrap(), None);
store.save_filter(test_name, filter_id).await.unwrap();
assert_eq!(store.get_filter(test_name).await.unwrap(), Some(filter_id.to_owned()));
}
#[async_test]
async fn test_sync_token_saving() {
let mut changes = StateChanges::default();
let store = get_store().await.unwrap();
let sync_token = "t392-516_47314_0_7_1".to_owned();
changes.sync_token = Some(sync_token.clone());
assert_eq!(store.get_sync_token().await.unwrap(), None);
store.save_changes(&changes).await.unwrap();
assert_eq!(store.get_sync_token().await.unwrap(), Some(sync_token));
}
#[async_test]
async fn test_stripped_member_saving() {
let store = get_store().await.unwrap();
@@ -625,7 +647,7 @@ macro_rules! statestore_integration_tests {
// Add sync response
let sync = SyncResponse::try_from_http_response(
Response::builder().body(serde_json::to_vec(&*test_json::MORE_SYNC).unwrap()).unwrap(),
Response::builder().body(serde_json::to_vec(&*test_json::MORE_SYNC).expect("Parsing MORE_SYNC failed")).unwrap(),
)
.unwrap();
@@ -643,7 +665,7 @@ macro_rules! statestore_integration_tests {
);
let mut changes = StateChanges::new(sync.next_batch.clone());
changes.add_timeline(room_id, timeline_slice);
store.save_changes(&changes).await.unwrap();
store.save_changes(&changes).await.expect("Saving room timeline failed");
check_timeline_events(room_id, &store, &stored_events, timeline.prev_batch.as_deref())
.await;
@@ -651,7 +673,7 @@ macro_rules! statestore_integration_tests {
// Add message response
let messages = MessageResponse::try_from_http_response(
Response::builder()
.body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_1).unwrap())
.body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_1).expect("Parsing SYNC_ROOM_MESSAGES_BATCH_1 failed"))
.unwrap(),
)
.unwrap();
@@ -669,14 +691,14 @@ macro_rules! statestore_integration_tests {
TimelineSlice::new(events, messages.start.clone(), messages.end.clone(), false, false);
let mut changes = StateChanges::default();
changes.add_timeline(room_id, timeline_slice);
store.save_changes(&changes).await.unwrap();
store.save_changes(&changes).await.expect("Saving room update timeline failed");
check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await;
// Add second message response
let messages = MessageResponse::try_from_http_response(
Response::builder()
.body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_2).unwrap())
.body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_2).expect("Parsing SYNC_ROOM_MESSAGES_BATCH_2 failed"))
.unwrap(),
)
.unwrap();
@@ -694,7 +716,7 @@ macro_rules! statestore_integration_tests {
TimelineSlice::new(events, messages.start.clone(), messages.end.clone(), false, false);
let mut changes = StateChanges::default();
changes.add_timeline(room_id, timeline_slice);
store.save_changes(&changes).await.unwrap();
store.save_changes(&changes).await.expect("Saving room update timeline 2 failed");
check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await;
@@ -722,7 +744,7 @@ macro_rules! statestore_integration_tests {
);
let mut changes = StateChanges::new(sync.next_batch.clone());
changes.add_timeline(room_id, timeline_slice);
store.save_changes(&changes).await.unwrap();
store.save_changes(&changes).await.expect("Saving room update timeline 3 failed");
check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await;
@@ -737,7 +759,7 @@ macro_rules! statestore_integration_tests {
);
let mut changes = StateChanges::default();
changes.add_timeline(room_id, timeline_slice);
store.save_changes(&changes).await.unwrap();
store.save_changes(&changes).await.expect("Saving room update timeline 4 failed");
check_timeline_events(room_id, &store, &Vec::new(), end_token.as_deref()).await;
}

View File

@@ -330,7 +330,7 @@ impl IndexeddbStore {
obj.put_key_val(
&self.encode_key(KEYS::FILTER, (KEYS::FILTER, filter_name)),
&JsValue::from_str(filter_id),
&self.serialize_event(&filter_id)?,
)?;
tx.await.into_result()?;
@@ -339,13 +339,13 @@ impl IndexeddbStore {
}
pub async fn get_filter(&self, filter_name: &str) -> Result<Option<String>> {
Ok(self
.inner
self.inner
.transaction_on_one_with_mode(KEYS::SESSION, IdbTransactionMode::Readonly)?
.object_store(KEYS::SESSION)?
.get(&self.encode_key(KEYS::FILTER, (KEYS::FILTER, filter_name)))?
.await?
.and_then(|f| f.as_string()))
.map(|f| self.deserialize_event(f))
.transpose()
}
pub async fn get_sync_token(&self) -> Result<Option<String>> {
@@ -675,7 +675,7 @@ impl IndexeddbStore {
let metadata: Option<TimelineMetadata> = timeline_metadata_store
.get(&self.encode_key(KEYS::ROOM_TIMELINE_METADATA, room_id))?
.await?
.map(|v| v.into_serde())
.map(|v| self.deserialize_event(&v))
.transpose()?;
if let Some(mut metadata) = metadata {
if !timeline.sync && Some(&timeline.start) != metadata.end.as_ref() {
@@ -834,7 +834,7 @@ impl IndexeddbStore {
timeline_metadata_store.put_key_val_owned(
&self.encode_key(KEYS::ROOM_TIMELINE_METADATA, room_id),
&JsValue::from_serde(&metadata)?,
&self.serialize_event(&metadata)?,
)?;
}
}
@@ -1474,11 +1474,13 @@ mod tests {
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
use matrix_sdk_base::statestore_integration_tests;
use uuid::Uuid;
use super::{IndexeddbStore, Result};
async fn get_store() -> Result<IndexeddbStore> {
Ok(IndexeddbStore::open().await?)
let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated().to_string());
Ok(IndexeddbStore::open_helper(db_name, None).await?)
}
statestore_integration_tests! { integration }

View File

@@ -11,6 +11,23 @@ use ruma::{
RoomId, TransactionId, UserId,
};
/// Hold any data to be used as an encoding key
/// without checking for the existence of `ENCODE_SEPARATOR` within
pub struct EncodeUnchecked<'a>(&'a [u8]);
impl<'a> EncodeUnchecked<'a> {
/// Wrap any `[u8]`
pub fn from(bytes: &'a [u8]) -> Self {
EncodeUnchecked(bytes)
}
}
impl<'a> EncodeKey for EncodeUnchecked<'a> {
fn encode_as_bytes(&self) -> Cow<'a, [u8]> {
(self.0).into()
}
}
pub const ENCODE_SEPARATOR: u8 = 0xff;
pub trait EncodeKey {

View File

@@ -62,9 +62,9 @@ use tokio::task::spawn_blocking;
#[cfg(feature = "crypto-store")]
use super::OpenStoreError;
use crate::encode_key::EncodeKey;
#[cfg(feature = "experimental-timeline")]
use crate::encode_key::ENCODE_SEPARATOR;
use crate::encode_key::{EncodeKey, EncodeUnchecked};
#[cfg(feature = "crypto-store")]
pub use crate::CryptoStore;
@@ -123,6 +123,7 @@ const VERSION_KEY: &str = "state-store-version";
const ACCOUNT_DATA: &str = "account-data";
const CUSTOM: &str = "custom";
const SYNC_TOKEN: &str = "sync_token";
const DISPLAY_NAME: &str = "display-name";
const INVITED_USER_ID: &str = "invited-user-id";
const JOINED_USER_ID: &str = "joined-user-id";
@@ -362,7 +363,7 @@ impl SledStore {
CryptoStore::open_with_database(self.inner.clone(), self.store_cipher.clone())
}
fn serialize_event(&self, event: &impl Serialize) -> Result<Vec<u8>, SledStoreError> {
fn serialize_value(&self, event: &impl Serialize) -> Result<Vec<u8>, SledStoreError> {
if let Some(key) = &self.store_cipher {
Ok(key.encrypt_value(event)?)
} else {
@@ -370,7 +371,7 @@ impl SledStore {
}
}
fn deserialize_event<T: DeserializeOwned>(&self, event: &[u8]) -> Result<T, SledStoreError> {
fn deserialize_value<T: DeserializeOwned>(&self, event: &[u8]) -> Result<T, SledStoreError> {
if let Some(key) = &self.store_cipher {
Ok(key.decrypt_value(event)?)
} else {
@@ -398,22 +399,22 @@ impl SledStore {
}
pub async fn save_filter(&self, filter_name: &str, filter_id: &str) -> Result<()> {
self.session.insert(self.encode_key(SESSION, ("filter", filter_name)), filter_id)?;
self.session.insert(
self.encode_key(SESSION, ("filter", filter_name)),
self.serialize_value(&filter_id)?,
)?;
Ok(())
}
pub async fn get_filter(&self, filter_name: &str) -> Result<Option<String>> {
Ok(self
.session
self.session
.get(self.encode_key(SESSION, ("filter", filter_name)))?
.map(|f| String::from_utf8_lossy(&f).to_string()))
.map(|f| self.deserialize_value(&f))
.transpose()
}
pub async fn get_sync_token(&self) -> Result<Option<String>> {
Ok(self
.session
.get("sync_token".encode())?
.map(|t| String::from_utf8_lossy(&t).to_string()))
self.session.get(SYNC_TOKEN.encode())?.map(|t| self.deserialize_value(&t)).transpose()
}
pub async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
@@ -461,14 +462,16 @@ impl SledStore {
MembershipState::Join => {
joined.insert(
self.encode_key(JOINED_USER_ID, &key),
event.state_key().as_str(),
self.serialize_value(event.state_key())
.map_err(ConflictableTransactionError::Abort)?,
)?;
invited.remove(self.encode_key(INVITED_USER_ID, &key))?;
}
MembershipState::Invite => {
invited.insert(
self.encode_key(INVITED_USER_ID, &key),
event.state_key().as_str(),
self.serialize_value(event.state_key())
.map_err(ConflictableTransactionError::Abort)?,
)?;
joined.remove(self.encode_key(JOINED_USER_ID, &key))?;
}
@@ -480,7 +483,7 @@ impl SledStore {
members.insert(
self.encode_key(MEMBER, &key),
self.serialize_event(&event)
self.serialize_value(&event)
.map_err(ConflictableTransactionError::Abort)?,
)?;
@@ -489,7 +492,7 @@ impl SledStore {
{
profiles.insert(
self.encode_key(PROFILE, &key),
self.serialize_event(&profile)
self.serialize_value(&profile)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -500,7 +503,7 @@ impl SledStore {
for (display_name, map) in ambiguity_maps {
display_names.insert(
self.encode_key(DISPLAY_NAME, (room_id, display_name)),
self.serialize_event(&map)
self.serialize_value(&map)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -510,7 +513,7 @@ impl SledStore {
for (event_type, event) in events {
room_account_data.insert(
self.encode_key(ROOM_ACCOUNT_DATA, &(room, event_type)),
self.serialize_event(&event)
self.serialize_value(&event)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -521,7 +524,7 @@ impl SledStore {
for (state_key, event) in events {
state.insert(
self.encode_key(ROOM_STATE, (room, event_type, state_key)),
self.serialize_event(&event)
self.serialize_value(&event)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -531,7 +534,7 @@ impl SledStore {
for (room_id, room_info) in &changes.room_infos {
rooms.insert(
self.encode_key(ROOM, room_id),
self.serialize_event(room_info)
self.serialize_value(room_info)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -539,7 +542,7 @@ impl SledStore {
for (room_id, info) in &changes.stripped_room_infos {
striped_rooms.insert(
self.encode_key(STRIPPED_ROOM_INFO, room_id),
self.serialize_event(&info)
self.serialize_value(&info)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -552,7 +555,8 @@ impl SledStore {
MembershipState::Join => {
stripped_joined.insert(
self.encode_key(STRIPPED_JOINED_USER_ID, &key),
event.state_key.as_str(),
self.serialize_value(&event.state_key)
.map_err(ConflictableTransactionError::Abort)?,
)?;
stripped_invited
.remove(self.encode_key(STRIPPED_INVITED_USER_ID, &key))?;
@@ -560,7 +564,8 @@ impl SledStore {
MembershipState::Invite => {
stripped_invited.insert(
self.encode_key(STRIPPED_INVITED_USER_ID, &key),
event.state_key.as_str(),
self.serialize_value(&event.state_key)
.map_err(ConflictableTransactionError::Abort)?,
)?;
stripped_joined
.remove(self.encode_key(STRIPPED_JOINED_USER_ID, &key))?;
@@ -574,7 +579,7 @@ impl SledStore {
}
stripped_members.insert(
self.encode_key(STRIPPED_ROOM_MEMBER, &key),
self.serialize_event(&event)
self.serialize_value(&event)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -588,7 +593,7 @@ impl SledStore {
STRIPPED_ROOM_STATE,
(room, event_type.to_string(), state_key),
),
self.serialize_event(&event)
self.serialize_value(&event)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -614,12 +619,12 @@ impl SledStore {
ROOM_USER_RECEIPT,
(room, receipt_type, user_id),
),
self.serialize_event(&(event_id, receipt))
self.serialize_value(&(event_id, receipt))
.map_err(ConflictableTransactionError::Abort)?,
)? {
// Remove the old receipt from the room event receipts
let (old_event, _): (OwnedEventId, Receipt) = self
.deserialize_event(&old)
.deserialize_value(&old)
.map_err(ConflictableTransactionError::Abort)?;
room_event_receipts.remove(self.encode_key(
ROOM_EVENT_RECEIPT,
@@ -633,7 +638,7 @@ impl SledStore {
ROOM_EVENT_RECEIPT,
(room, receipt_type, event_id, user_id),
),
self.serialize_event(&(user_id, receipt))
self.serialize_value(&(user_id, receipt))
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -644,7 +649,7 @@ impl SledStore {
for (sender, event) in &changes.presence {
presence.insert(
self.encode_key(PRESENCE, sender),
self.serialize_event(&event)
self.serialize_value(&event)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -662,13 +667,16 @@ impl SledStore {
let ret: Result<(), TransactionError<SledStoreError>> = (&self.session, &self.account_data)
.transaction(|(session, account_data)| {
if let Some(s) = &changes.sync_token {
session.insert("sync_token".encode(), s.as_str())?;
session.insert(
SYNC_TOKEN.encode(),
self.serialize_value(s).map_err(ConflictableTransactionError::Abort)?,
)?;
}
for (event_type, event) in &changes.account_data {
account_data.insert(
self.encode_key(ACCOUNT_DATA, event_type),
self.serialize_event(&event)
self.serialize_value(&event)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
@@ -688,7 +696,7 @@ impl SledStore {
pub async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
let db = self.clone();
let key = self.encode_key(PRESENCE, user_id);
spawn_blocking(move || db.presence.get(key)?.map(|e| db.deserialize_event(&e)).transpose())
spawn_blocking(move || db.presence.get(key)?.map(|e| db.deserialize_value(&e)).transpose())
.await?
}
@@ -701,7 +709,7 @@ impl SledStore {
let db = self.clone();
let key = self.encode_key(ROOM_STATE, (room_id, event_type.to_string(), state_key));
spawn_blocking(move || {
db.room_state.get(key)?.map(|e| db.deserialize_event(&e)).transpose()
db.room_state.get(key)?.map(|e| db.deserialize_value(&e)).transpose()
})
.await?
}
@@ -716,7 +724,7 @@ impl SledStore {
spawn_blocking(move || {
db.room_state
.scan_prefix(key)
.flat_map(|e| e.map(|(_, e)| db.deserialize_event(&e)))
.flat_map(|e| e.map(|(_, e)| db.deserialize_value(&e)))
.collect::<Result<_, _>>()
})
.await?
@@ -729,7 +737,7 @@ impl SledStore {
) -> Result<Option<MinimalStateEvent<RoomMemberEventContent>>> {
let db = self.clone();
let key = self.encode_key(PROFILE, (room_id, user_id));
spawn_blocking(move || db.profiles.get(key)?.map(|p| db.deserialize_event(&p)).transpose())
spawn_blocking(move || db.profiles.get(key)?.map(|p| db.deserialize_value(&p)).transpose())
.await?
}
@@ -742,12 +750,12 @@ impl SledStore {
let key = self.encode_key(MEMBER, (room_id, state_key));
let stripped_key = self.encode_key(STRIPPED_ROOM_MEMBER, (room_id, state_key));
spawn_blocking(move || {
if let Some(e) = db.members.get(key)?.map(|v| db.deserialize_event(&v)).transpose()? {
if let Some(e) = db.members.get(key)?.map(|v| db.deserialize_value(&v)).transpose()? {
Ok(Some(MemberEvent::Sync(e)))
} else if let Some(e) = db
.stripped_members
.get(stripped_key)?
.map(|v| db.deserialize_event(&v))
.map(|v| db.deserialize_value(&v))
.transpose()?
{
Ok(Some(MemberEvent::Stripped(e)))
@@ -784,9 +792,9 @@ impl SledStore {
let db = self.clone();
let key = self.encode_key(INVITED_USER_ID, room_id);
spawn_blocking(move || {
stream::iter(db.invited_user_ids.scan_prefix(key).map(|u| {
UserId::parse(String::from_utf8_lossy(&u.map_err(StoreError::backend)?.1))
.map_err(StoreError::Identifier)
stream::iter(db.invited_user_ids.scan_prefix(key).map(move |u| {
db.deserialize_value(&u.map_err(StoreError::backend)?.1)
.map_err(StoreError::backend)
}))
})
.await
@@ -800,9 +808,9 @@ impl SledStore {
let db = self.clone();
let key = self.encode_key(JOINED_USER_ID, room_id);
spawn_blocking(move || {
stream::iter(db.joined_user_ids.scan_prefix(key).map(|u| {
UserId::parse(String::from_utf8_lossy(&u.map_err(StoreError::backend)?.1))
.map_err(StoreError::Identifier)
stream::iter(db.joined_user_ids.scan_prefix(key).map(move |u| {
db.deserialize_value(&u.map_err(StoreError::backend)?.1)
.map_err(StoreError::backend)
}))
})
.await
@@ -816,9 +824,9 @@ impl SledStore {
let db = self.clone();
let key = self.encode_key(STRIPPED_INVITED_USER_ID, room_id);
spawn_blocking(move || {
stream::iter(db.stripped_invited_user_ids.scan_prefix(key).map(|u| {
UserId::parse(String::from_utf8_lossy(&u.map_err(StoreError::backend)?.1))
.map_err(StoreError::Identifier)
stream::iter(db.stripped_invited_user_ids.scan_prefix(key).map(move |u| {
db.deserialize_value(&u.map_err(StoreError::backend)?.1)
.map_err(StoreError::backend)
}))
})
.await
@@ -832,9 +840,9 @@ impl SledStore {
let db = self.clone();
let key = self.encode_key(STRIPPED_JOINED_USER_ID, room_id);
spawn_blocking(move || {
stream::iter(db.stripped_joined_user_ids.scan_prefix(key).map(|u| {
UserId::parse(String::from_utf8_lossy(&u.map_err(StoreError::backend)?.1))
.map_err(StoreError::Identifier)
stream::iter(db.stripped_joined_user_ids.scan_prefix(key).map(move |u| {
db.deserialize_value(&u.map_err(StoreError::backend)?.1)
.map_err(StoreError::backend)
}))
})
.await
@@ -844,7 +852,7 @@ impl SledStore {
pub async fn get_room_infos(&self) -> Result<impl Stream<Item = Result<RoomInfo>>> {
let db = self.clone();
spawn_blocking(move || {
stream::iter(db.room_info.iter().map(move |r| db.deserialize_event(&r?.1)))
stream::iter(db.room_info.iter().map(move |r| db.deserialize_value(&r?.1)))
})
.await
.map_err(Into::into)
@@ -853,7 +861,7 @@ impl SledStore {
pub async fn get_stripped_room_infos(&self) -> Result<impl Stream<Item = Result<RoomInfo>>> {
let db = self.clone();
spawn_blocking(move || {
stream::iter(db.stripped_room_infos.iter().map(move |r| db.deserialize_event(&r?.1)))
stream::iter(db.stripped_room_infos.iter().map(move |r| db.deserialize_value(&r?.1)))
})
.await
.map_err(Into::into)
@@ -870,7 +878,7 @@ impl SledStore {
Ok(db
.display_names
.get(key)?
.map(|m| db.deserialize_event(&m))
.map(|m| db.deserialize_value(&m))
.transpose()?
.unwrap_or_default())
})
@@ -884,7 +892,7 @@ impl SledStore {
let db = self.clone();
let key = self.encode_key(ACCOUNT_DATA, event_type);
spawn_blocking(move || {
db.account_data.get(key)?.map(|m| db.deserialize_event(&m)).transpose()
db.account_data.get(key)?.map(|m| db.deserialize_value(&m)).transpose()
})
.await?
}
@@ -897,7 +905,7 @@ impl SledStore {
let db = self.clone();
let key = self.encode_key(ROOM_ACCOUNT_DATA, (room_id, event_type));
spawn_blocking(move || {
db.room_account_data.get(key)?.map(|m| db.deserialize_event(&m)).transpose()
db.room_account_data.get(key)?.map(|m| db.deserialize_value(&m)).transpose()
})
.await?
}
@@ -911,7 +919,7 @@ impl SledStore {
let db = self.clone();
let key = self.encode_key(ROOM_USER_RECEIPT, (room_id, receipt_type, user_id));
spawn_blocking(move || {
db.room_user_receipts.get(key)?.map(|m| db.deserialize_event(&m)).transpose()
db.room_user_receipts.get(key)?.map(|m| db.deserialize_value(&m)).transpose()
})
.await?
}
@@ -930,7 +938,7 @@ impl SledStore {
.values()
.map(|u| {
let v = u.map_err(StoreError::backend)?;
db.deserialize_event(&v).map_err(StoreError::backend)
db.deserialize_value(&v).map_err(StoreError::backend)
})
.collect()
})
@@ -941,7 +949,7 @@ impl SledStore {
async fn add_media_content(&self, request: &MediaRequest, data: Vec<u8>) -> Result<()> {
self.media.insert(
self.encode_key(MEDIA, (request.source.unique_key(), request.format.unique_key())),
data,
self.serialize_value(&data)?,
)?;
self.inner.flush_async().await?;
@@ -954,20 +962,31 @@ impl SledStore {
let key =
self.encode_key(MEDIA, (request.source.unique_key(), request.format.unique_key()));
spawn_blocking(move || Ok(db.media.get(key)?.map(|m| m.to_vec()))).await?
spawn_blocking(move || {
db.media.get(key)?.map(move |m| db.deserialize_value(&m)).transpose()
})
.await?
}
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let custom = self.custom.clone();
let key = key.to_owned();
spawn_blocking(move || Ok(custom.get(key)?.map(|v| v.to_vec()))).await?
let me = self.clone();
let key = self.encode_key(CUSTOM, EncodeUnchecked::from(key));
spawn_blocking(move || custom.get(key)?.map(move |v| me.deserialize_value(&v)).transpose())
.await?
}
async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
let ret = self.custom.insert(key, value)?.map(|v| v.to_vec());
let key = self.encode_key(CUSTOM, EncodeUnchecked::from(key));
let me = self.clone();
let ret = self
.custom
.insert(key, me.serialize_value(&value)?)?
.map(|v| me.deserialize_value(&v))
.transpose();
self.inner.flush_async().await?;
Ok(ret)
ret
}
async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
@@ -1133,7 +1152,7 @@ impl SledStore {
let metadata: Option<TimelineMetadata> = db
.room_timeline_metadata
.get(key.as_slice())?
.map(|v| serde_json::from_slice(&v).map_err(StoreError::Json))
.map(|v| self.deserialize_value(&v))
.transpose()?;
let metadata = match metadata {
Some(m) => m,
@@ -1155,7 +1174,7 @@ impl SledStore {
let stream = stream! {
while let Ok(Some(item)) = db.room_timeline.get(&db.encode_key_with_counter(TIMELINE, &r_id, position)) {
position += 1;
yield db.deserialize_event(&item).map_err(SledStoreError::from).map_err(|e| e.into());
yield db.deserialize_value(&item).map_err(SledStoreError::from).map_err(|e| e.into());
}
};
@@ -1223,7 +1242,7 @@ impl SledStore {
let metadata: Option<TimelineMetadata> = self
.room_timeline_metadata
.get(self.encode_key(TIMELINE_METADATA, &room_id))?
.map(|v| serde_json::from_slice(&v).map_err(StoreError::Json))
.map(|item| self.deserialize_value(&item))
.transpose()?;
if let Some(mut metadata) = metadata {
if !timeline.sync && Some(&timeline.start) != metadata.end.as_ref() {
@@ -1279,7 +1298,7 @@ impl SledStore {
let room_version = self
.room_info
.get(&self.encode_key(ROOM_INFO, room_id))?
.map(|r| self.deserialize_event::<RoomInfo>(&r))
.map(|r| self.deserialize_value::<RoomInfo>(&r))
.transpose()?
.and_then(|info| info.room_version().cloned())
.unwrap_or_else(|| {
@@ -1308,7 +1327,7 @@ impl SledStore {
.room_timeline
.get(position_key.as_ref())?
.map(|e| {
self.deserialize_event::<SyncRoomEvent>(&e)
self.deserialize_value::<SyncRoomEvent>(&e)
.map_err(SledStoreError::from)
})
.transpose()?
@@ -1319,7 +1338,7 @@ impl SledStore {
.map_err(StoreError::Redaction)?;
full_event.event = Raw::new(&event_json)?.cast();
timeline_batch
.insert(position_key, self.serialize_event(&full_event)?);
.insert(position_key, self.serialize_value(&full_event)?);
}
}
}
@@ -1327,7 +1346,7 @@ impl SledStore {
metadata.start_position -= 1;
let key =
self.encode_key_with_counter(TIMELINE, room_id, metadata.start_position);
timeline_batch.insert(key.as_slice(), self.serialize_event(&event)?);
timeline_batch.insert(key.as_slice(), self.serialize_value(&event)?);
// Only add event with id to the position map
if let Some(event_id) = event.event_id() {
let event_key =
@@ -1340,7 +1359,7 @@ impl SledStore {
metadata.end_position += 1;
let key =
self.encode_key_with_counter(TIMELINE, room_id, metadata.end_position);
timeline_batch.insert(key.as_slice(), self.serialize_event(&event)?);
timeline_batch.insert(key.as_slice(), self.serialize_value(&event)?);
// Only add event with id to the position map
if let Some(event_id) = event.event_id() {
let event_key =
@@ -1352,7 +1371,7 @@ impl SledStore {
timeline_metadata_batch.insert(
self.encode_key(TIMELINE_METADATA, &room_id),
serde_json::to_vec(&metadata)?,
self.serialize_value(&metadata)?,
);
}