From fb3edeb2069bbbbafb0cc95bf2e3fd001acbb493 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Thu, 17 Feb 2022 18:32:07 +0100 Subject: [PATCH] store: Save timeline to MemoryStore --- crates/matrix-sdk-base/Cargo.toml | 1 + .../matrix-sdk-base/src/store/memory_store.rs | 159 +++++++++++++++++- 2 files changed, 153 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk-base/Cargo.toml b/crates/matrix-sdk-base/Cargo.toml index fc0881e5c..b7eaff9b9 100644 --- a/crates/matrix-sdk-base/Cargo.toml +++ b/crates/matrix-sdk-base/Cargo.toml @@ -34,6 +34,7 @@ indexeddb_state_store = ["indexed_db_futures", "wasm-bindgen", "pbkdf2", "hmac", indexeddb_cryptostore = ["matrix-sdk-crypto/indexeddb_cryptostore"] [dependencies] +async-stream = "0.3.2" chacha20poly1305 = { version = "0.9.0", optional = true } dashmap = "4.0.2" futures-core = "0.3.15" diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index b3033defc..2120ddf50 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -13,10 +13,11 @@ // limitations under the License. use std::{ - collections::BTreeSet, + collections::{BTreeMap, BTreeSet, HashMap}, sync::{Arc, RwLock}, }; +use async_stream::stream; use dashmap::{DashMap, DashSet}; use futures_core::stream::BoxStream; use lru::LruCache; @@ -28,14 +29,14 @@ use ruma::{ receipt::Receipt, room::member::{MembershipState, RoomMemberEventContent}, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, - AnySyncStateEvent, EventType, + AnySyncMessageEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, Redact, }, receipt::ReceiptType, serde::Raw, - EventId, MxcUri, RoomId, UserId, + EventId, MxcUri, RoomId, RoomVersionId, UserId, }; #[allow(unused_imports)] -use tracing::info; +use tracing::{info, warn}; use super::{Result, RoomInfo, StateChanges, StateStore}; use crate::{ @@ -69,6 +70,7 @@ pub struct MemoryStore { >, media: Arc>>>, custom: Arc, Vec>>, + room_timeline: Arc, TimelineData>>, } impl MemoryStore { @@ -94,6 +96,7 @@ impl MemoryStore { room_event_receipts: Default::default(), media: Arc::new(Mutex::new(LruCache::new(100))), custom: DashMap::new().into(), + room_timeline: Default::default(), } } @@ -275,6 +278,116 @@ impl MemoryStore { } } + for (room, timeline) in &changes.timeline { + if timeline.sync { + info!("Save new timeline batch from sync response for {}", room); + } else { + info!("Save new timeline batch from messages response for {}", room); + } + + let data = if timeline.limited { + info!("Delete stored timeline for {} because the sync response was limited", room); + self.room_timeline.remove(room); + None + } else if let Some(mut data) = self.room_timeline.get_mut(room) { + if !timeline.sync && Some(&timeline.start) != data.end.as_ref() { + warn!("Drop unexpected timeline batch for {}", room); + return Ok(()); + } + + // Check if the event already exists in the store + let mut delete_timeline = false; + for event in &timeline.events { + if let Some(event_id) = event.event_id() { + if data.event_id_to_position.contains_key(&event_id) { + delete_timeline = true; + break; + } + } + } + + if delete_timeline { + info!("Delete stored timeline for {} because of duplicated events", room); + self.room_timeline.remove(room); + None + } else if timeline.sync { + data.start = timeline.start.clone(); + Some(data) + } else { + data.end = timeline.end.clone(); + Some(data) + } + } else { + None + }; + + let mut data = &mut *if let Some(data) = data { + data + } else { + let data = TimelineData { + start: timeline.start.clone(), + end: timeline.end.clone(), + ..Default::default() + }; + self.room_timeline.insert(room.to_owned(), data); + self.room_timeline.get_mut(room).unwrap() + }; + + // Create a copy of the events if the stream created via `room_timeline()` isn't + // fully consumed + let data_events = Arc::make_mut(&mut data.events); + + if timeline.sync { + let mut room_version = None; + for event in timeline.events.iter().rev() { + // Redact events already in store only on sync response + if let Ok(AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomRedaction( + redaction, + ))) = event.event.deserialize() + { + if let Some(position) = data.event_id_to_position.get(&redaction.redacts) { + if let Some(mut full_event) = data_events.get_mut(position) { + let inner_event = full_event.event.deserialize()?; + if room_version.is_none() { + room_version = Some(self.room_info + .get(room) + .and_then(|info| { + info.base_info + .create + .as_ref() + .map(|event| event.room_version.clone()) + }).unwrap_or_else(|| { + warn!("Unable to find the room version for {}, assume version 9", room); + RoomVersionId::V9 + })); + } + + full_event.event = Raw::new(&AnySyncRoomEvent::from( + inner_event.redact(redaction, room_version.as_ref().unwrap()), + ))?; + } + } + } + + data.start_position -= 1; + // Only add event with id to the position map + if let Some(event_id) = event.event_id() { + data.event_id_to_position.insert(event_id, data.start_position); + } + data_events.insert(data.start_position, event.to_owned()); + } + } else { + for event in timeline.events.iter() { + data.end_position += 1; + // Only add event with id to the position map + if let Some(event_id) = event.event_id() { + data.event_id_to_position.insert(event_id, data.end_position); + } + data_events.insert(data.end_position, event.to_owned()); + } + } + } + info!("Saved changes in {:?}", now.elapsed()); Ok(()) @@ -456,9 +569,32 @@ impl MemoryStore { self.stripped_members.remove(room_id); self.room_user_receipts.remove(room_id); self.room_event_receipts.remove(room_id); + self.room_timeline.remove(room_id); Ok(()) } + + async fn room_timeline( + &self, + room_id: &RoomId, + ) -> Result>, Option)>> { + if let Some(data) = self.room_timeline.get(room_id) { + let events = data.events.clone(); + let stream = stream! { + for item in events.values() { + yield Ok(item.to_owned()); + } + }; + info!( + "Found previously stored timeline for {}, with end token {:?}", + room_id, data.end + ); + Ok(Some((Box::pin(stream), data.end.to_owned()))) + } else { + info!("No timeline for {} was previously stored", room_id); + Ok(None) + } + } } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -612,13 +748,22 @@ impl StateStore for MemoryStore { async fn room_timeline( &self, - _room_id: &RoomId, + room_id: &RoomId, ) -> Result>, Option)>> { - // The `MemoryStore` doesn't cache any events - Ok(None) + self.room_timeline(room_id).await } } +#[derive(Debug, Default)] +struct TimelineData { + pub start: String, + pub start_position: isize, + pub end: Option, + pub end_position: isize, + pub events: Arc>, + pub event_id_to_position: HashMap, isize>, +} + #[cfg(test)] mod test {