store: Save timeline to MemoryStore

This commit is contained in:
Julian Sparber
2022-02-17 18:32:07 +01:00
parent 516aa26589
commit fb3edeb206
2 changed files with 153 additions and 7 deletions

View File

@@ -34,6 +34,7 @@ indexeddb_state_store = ["indexed_db_futures", "wasm-bindgen", "pbkdf2", "hmac",
indexeddb_cryptostore = ["matrix-sdk-crypto/indexeddb_cryptostore"]
[dependencies]
async-stream = "0.3.2"
chacha20poly1305 = { version = "0.9.0", optional = true }
dashmap = "4.0.2"
futures-core = "0.3.15"

View File

@@ -13,10 +13,11 @@
// limitations under the License.
use std::{
collections::BTreeSet,
collections::{BTreeMap, BTreeSet, HashMap},
sync::{Arc, RwLock},
};
use async_stream::stream;
use dashmap::{DashMap, DashSet};
use futures_core::stream::BoxStream;
use lru::LruCache;
@@ -28,14 +29,14 @@ use ruma::{
receipt::Receipt,
room::member::{MembershipState, RoomMemberEventContent},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncStateEvent, EventType,
AnySyncMessageEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, Redact,
},
receipt::ReceiptType,
serde::Raw,
EventId, MxcUri, RoomId, UserId,
EventId, MxcUri, RoomId, RoomVersionId, UserId,
};
#[allow(unused_imports)]
use tracing::info;
use tracing::{info, warn};
use super::{Result, RoomInfo, StateChanges, StateStore};
use crate::{
@@ -69,6 +70,7 @@ pub struct MemoryStore {
>,
media: Arc<Mutex<LruCache<String, Vec<u8>>>>,
custom: Arc<DashMap<Vec<u8>, Vec<u8>>>,
room_timeline: Arc<DashMap<Box<RoomId>, TimelineData>>,
}
impl MemoryStore {
@@ -94,6 +96,7 @@ impl MemoryStore {
room_event_receipts: Default::default(),
media: Arc::new(Mutex::new(LruCache::new(100))),
custom: DashMap::new().into(),
room_timeline: Default::default(),
}
}
@@ -275,6 +278,116 @@ impl MemoryStore {
}
}
for (room, timeline) in &changes.timeline {
if timeline.sync {
info!("Save new timeline batch from sync response for {}", room);
} else {
info!("Save new timeline batch from messages response for {}", room);
}
let data = if timeline.limited {
info!("Delete stored timeline for {} because the sync response was limited", room);
self.room_timeline.remove(room);
None
} else if let Some(mut data) = self.room_timeline.get_mut(room) {
if !timeline.sync && Some(&timeline.start) != data.end.as_ref() {
warn!("Drop unexpected timeline batch for {}", room);
return Ok(());
}
// Check if the event already exists in the store
let mut delete_timeline = false;
for event in &timeline.events {
if let Some(event_id) = event.event_id() {
if data.event_id_to_position.contains_key(&event_id) {
delete_timeline = true;
break;
}
}
}
if delete_timeline {
info!("Delete stored timeline for {} because of duplicated events", room);
self.room_timeline.remove(room);
None
} else if timeline.sync {
data.start = timeline.start.clone();
Some(data)
} else {
data.end = timeline.end.clone();
Some(data)
}
} else {
None
};
let mut data = &mut *if let Some(data) = data {
data
} else {
let data = TimelineData {
start: timeline.start.clone(),
end: timeline.end.clone(),
..Default::default()
};
self.room_timeline.insert(room.to_owned(), data);
self.room_timeline.get_mut(room).unwrap()
};
// Create a copy of the events if the stream created via `room_timeline()` isn't
// fully consumed
let data_events = Arc::make_mut(&mut data.events);
if timeline.sync {
let mut room_version = None;
for event in timeline.events.iter().rev() {
// Redact events already in store only on sync response
if let Ok(AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomRedaction(
redaction,
))) = event.event.deserialize()
{
if let Some(position) = data.event_id_to_position.get(&redaction.redacts) {
if let Some(mut full_event) = data_events.get_mut(position) {
let inner_event = full_event.event.deserialize()?;
if room_version.is_none() {
room_version = Some(self.room_info
.get(room)
.and_then(|info| {
info.base_info
.create
.as_ref()
.map(|event| event.room_version.clone())
}).unwrap_or_else(|| {
warn!("Unable to find the room version for {}, assume version 9", room);
RoomVersionId::V9
}));
}
full_event.event = Raw::new(&AnySyncRoomEvent::from(
inner_event.redact(redaction, room_version.as_ref().unwrap()),
))?;
}
}
}
data.start_position -= 1;
// Only add event with id to the position map
if let Some(event_id) = event.event_id() {
data.event_id_to_position.insert(event_id, data.start_position);
}
data_events.insert(data.start_position, event.to_owned());
}
} else {
for event in timeline.events.iter() {
data.end_position += 1;
// Only add event with id to the position map
if let Some(event_id) = event.event_id() {
data.event_id_to_position.insert(event_id, data.end_position);
}
data_events.insert(data.end_position, event.to_owned());
}
}
}
info!("Saved changes in {:?}", now.elapsed());
Ok(())
@@ -456,9 +569,32 @@ impl MemoryStore {
self.stripped_members.remove(room_id);
self.room_user_receipts.remove(room_id);
self.room_event_receipts.remove(room_id);
self.room_timeline.remove(room_id);
Ok(())
}
async fn room_timeline(
&self,
room_id: &RoomId,
) -> Result<Option<(BoxStream<'static, Result<SyncRoomEvent>>, Option<String>)>> {
if let Some(data) = self.room_timeline.get(room_id) {
let events = data.events.clone();
let stream = stream! {
for item in events.values() {
yield Ok(item.to_owned());
}
};
info!(
"Found previously stored timeline for {}, with end token {:?}",
room_id, data.end
);
Ok(Some((Box::pin(stream), data.end.to_owned())))
} else {
info!("No timeline for {} was previously stored", room_id);
Ok(None)
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -612,13 +748,22 @@ impl StateStore for MemoryStore {
async fn room_timeline(
&self,
_room_id: &RoomId,
room_id: &RoomId,
) -> Result<Option<(BoxStream<'static, Result<SyncRoomEvent>>, Option<String>)>> {
// The `MemoryStore` doesn't cache any events
Ok(None)
self.room_timeline(room_id).await
}
}
#[derive(Debug, Default)]
struct TimelineData {
pub start: String,
pub start_position: isize,
pub end: Option<String>,
pub end_position: isize,
pub events: Arc<BTreeMap<isize, SyncRoomEvent>>,
pub event_id_to_position: HashMap<Box<EventId>, isize>,
}
#[cfg(test)]
mod test {