feat(sdk): Start disabling the global store in EventCache.

This commit is contained in:
Ivan Enderlin
2024-03-18 12:57:45 +01:00
parent 36e199c31e
commit 54729ce32b

View File

@@ -68,10 +68,11 @@ use tokio::{
};
use tracing::{error, instrument, trace, warn};
use self::store::{EventCacheStore, MemoryStore, TimelineEntry};
use crate::{
client::ClientInner, event_cache::store::PaginationToken, room::MessagesOptions, Client, Room,
use self::{
linked_chunk::ChunkContent,
store::{EventCacheStore, Gap, MemoryStore, PaginationToken, RoomEvents, TimelineEntry},
};
use crate::{client::ClientInner, room::MessagesOptions, Client, Room};
mod linked_chunk;
mod store;
@@ -149,15 +150,14 @@ impl Debug for EventCache {
impl EventCache {
/// Create a new [`EventCache`] for the given client.
pub(crate) fn new(client: &Arc<ClientInner>) -> Self {
let store = Arc::new(MemoryStore::new());
let inner = Arc::new(EventCacheInner {
client: Arc::downgrade(client),
by_room: Default::default(),
store: Arc::new(Mutex::new(store)),
drop_handles: Default::default(),
});
Self { inner }
Self {
inner: Arc::new(EventCacheInner {
client: Arc::downgrade(client),
multiple_room_updates_lock: Default::default(),
by_room: Default::default(),
drop_handles: Default::default(),
}),
}
}
/// Starts subscribing the [`EventCache`] to sync responses, if not done
@@ -205,6 +205,8 @@ impl EventCache {
// Forget everything we know; we could have missed events, and we have
// no way to reconcile at the moment!
// TODO: implement Smart Matching™,
todo!();
/*
let store = inner.store.lock().await;
let mut by_room = inner.by_room.write().await;
for room_id in by_room.keys() {
@@ -213,6 +215,7 @@ impl EventCache {
}
}
by_room.clear();
*/
}
Err(RecvError::Closed) => {
@@ -256,15 +259,17 @@ impl EventCache {
// We could have received events during a previous sync; remove them all, since
// we can't know where to insert the "initial events" with respect to
// them.
todo!();
/*
let store = self.inner.store.lock().await;
store.clear_room(room_id).await?;
*/
let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear);
room_cache
.inner
.append_events(
&**store,
events,
prev_batch,
Default::default(),
@@ -282,20 +287,14 @@ struct EventCacheInner {
/// on the owning client.
client: Weak<ClientInner>,
multiple_room_updates_lock: Mutex<()>,
/// Lazily-filled cache of live [`RoomEventCache`], once per room.
by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
/// Backend used for storage.
///
/// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
/// ensure that multiple updates will be applied in the correct order, which
/// is enforced by taking the store lock when handling an update.
///
/// TODO: replace with a cross-process lock
store: Arc<Mutex<Arc<dyn EventCacheStore>>>,
/// Handles to keep alive the task listening to updates.
drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
// TODO: that's the place to add a cross-process lock!
}
impl EventCacheInner {
@@ -308,7 +307,7 @@ impl EventCacheInner {
async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
// First, take the lock that indicates we're processing updates, to avoid
// handling multiple updates concurrently.
let store = self.store.lock().await;
let lock = self.multiple_room_updates_lock.lock().await;
// Left rooms.
for (room_id, left_room_update) in updates.leave {
@@ -317,7 +316,7 @@ impl EventCacheInner {
continue;
};
if let Err(err) = room.inner.handle_left_room_update(&**store, left_room_update).await {
if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
// Non-fatal error, try to continue to the next room.
error!("handling left room update: {err}");
}
@@ -330,9 +329,7 @@ impl EventCacheInner {
continue;
};
if let Err(err) =
room.inner.handle_joined_room_update(&**store, joined_room_update).await
{
if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
// Non-fatal error, try to continue to the next room.
error!("handling joined room update: {err}");
}
@@ -371,7 +368,7 @@ impl EventCacheInner {
return Ok(None);
};
let room_event_cache = RoomEventCache::new(room, self.store.clone());
let room_event_cache = RoomEventCache::new(room);
by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
@@ -397,8 +394,8 @@ impl Debug for RoomEventCache {
impl RoomEventCache {
/// Create a new [`RoomEventCache`] using the given room and store.
fn new(room: Room, store: Arc<Mutex<Arc<dyn EventCacheStore>>>) -> Self {
Self { inner: Arc::new(RoomEventCacheInner::new(room, store)) }
fn new(room: Room) -> Self {
Self { inner: Arc::new(RoomEventCacheInner::new(room)) }
}
/// Subscribe to room updates for this room, after getting the initial list
@@ -408,9 +405,12 @@ impl RoomEventCache {
pub async fn subscribe(
&self,
) -> Result<(Vec<SyncTimelineEvent>, Receiver<RoomEventCacheUpdate>)> {
/*
let store = self.inner.store.lock().await;
Ok((store.room_events(self.inner.room.room_id()).await?, self.inner.sender.subscribe()))
*/
todo!()
}
/// Returns the oldest back-pagination token, that is, the one closest to
@@ -447,14 +447,12 @@ struct RoomEventCacheInner {
/// Sender part for subscribers to this room.
sender: Sender<RoomEventCacheUpdate>,
/// Backend used for storage, shared with the parent [`EventCacheInner`].
///
/// See comment there.
store: Arc<Mutex<Arc<dyn EventCacheStore>>>,
/// The Client [`Room`] this event cache pertains to.
room: Room,
/// The events of the room.
events: RwLock<RoomEvents>,
/// A notifier that we received a new pagination token.
pagination_token_notifier: Notify,
@@ -466,24 +464,20 @@ struct RoomEventCacheInner {
impl RoomEventCacheInner {
/// Creates a new cache for a room, and subscribes to room updates, so as
/// to handle new timeline events.
fn new(room: Room, store: Arc<Mutex<Arc<dyn EventCacheStore>>>) -> Self {
fn new(room: Room) -> Self {
let sender = Sender::new(32);
Self {
room,
store,
events: RwLock::new(RoomEvents::default()),
sender,
pagination_lock: Default::default(),
pagination_token_notifier: Default::default(),
}
}
async fn handle_joined_room_update(
&self,
store: &dyn EventCacheStore,
updates: JoinedRoomUpdate,
) -> Result<()> {
async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
self.handle_timeline(
store,
updates.timeline,
updates.ephemeral.clone(),
updates.account_data,
@@ -495,7 +489,6 @@ impl RoomEventCacheInner {
async fn handle_timeline(
&self,
store: &dyn EventCacheStore,
timeline: Timeline,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
@@ -508,7 +501,7 @@ impl RoomEventCacheInner {
trace!("limited timeline, clearing all previous events");
// Clear internal state (events, pagination tokens, etc.).
store.clear_room(self.room.room_id()).await?;
self.events.write().await.reset();
// Propagate to observers.
let _ = self.sender.send(RoomEventCacheUpdate::Clear);
@@ -517,7 +510,6 @@ impl RoomEventCacheInner {
// Add all the events to the backend.
trace!("adding new events");
self.append_events(
store,
timeline.events,
timeline.prev_batch,
account_data,
@@ -529,19 +521,9 @@ impl RoomEventCacheInner {
Ok(())
}
async fn handle_left_room_update(
&self,
store: &dyn EventCacheStore,
updates: LeftRoomUpdate,
) -> Result<()> {
self.handle_timeline(
store,
updates.timeline,
Vec::new(),
Vec::new(),
updates.ambiguity_changes,
)
.await?;
async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
self.handle_timeline(updates.timeline, Vec::new(), Vec::new(), updates.ambiguity_changes)
.await?;
Ok(())
}
@@ -549,7 +531,6 @@ impl RoomEventCacheInner {
/// observers.
async fn append_events(
&self,
store: &dyn EventCacheStore,
events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
@@ -565,21 +546,17 @@ impl RoomEventCacheInner {
return Ok(());
}
let room_id = self.room.room_id();
// Add the previous back-pagination token (if present), followed by the timeline
// events themselves.
let gap_with_token = prev_batch
.clone()
.map(|val| TimelineEntry::Gap { prev_token: PaginationToken(val) })
.into_iter();
{
let mut room_events = self.events.write().await;
store
.append_room_entries(
room_id,
gap_with_token.chain(events.iter().cloned().map(TimelineEntry::Event)).collect(),
)
.await?;
if let Some(prev_token) = &prev_batch {
room_events.push_gap(Gap { prev_token: PaginationToken(prev_token.clone()) });
}
room_events.push_events(events.clone().into_iter());
}
if prev_batch.is_some() {
self.pagination_token_notifier.notify_one();
@@ -610,13 +587,27 @@ impl RoomEventCacheInner {
// Make sure there's at most one back-pagination request.
let _guard = self.pagination_lock.lock().await;
if let Some(token) = token.as_ref() {
let store = self.store.lock().await;
if !store.contains_gap(self.room.room_id(), token).await? {
// Make sure the `RoomEvents` isn't updated while we are back-paginating.
let mut room_events = self.events.write().await;
// Check that the `token` exists if any.
let gap_identifier = if let Some(token) = token.as_ref() {
let gap_identifier = room_events.chunk_identifier(|chunk| {
matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if prev_token == token)
});
// The method has been called with `token` but it doesn't exist in `RoomEvents`,
// it's an error.
if gap_identifier.is_none() {
return Err(EventCacheError::UnknownBackpaginationToken);
}
}
gap_identifier
} else {
None
};
// Get messages.
let messages = self
.room
.messages(assign!(MessagesOptions::backward(), {
@@ -629,7 +620,8 @@ impl RoomEventCacheInner {
// Would we want to backpaginate again, we'd start from the `end` token as the
// next `from` token.
let prev_token = messages.end;
let prev_token =
messages.end.map(|prev_token| Gap { prev_token: PaginationToken(prev_token) });
// If this token is missing, then we've reached the end of the timeline.
let reached_start = prev_token.is_none();
@@ -640,41 +632,64 @@ impl RoomEventCacheInner {
// should be prepended first).
let events = messages.chunk;
// Prepend the previous token (if any) at the beginning of the timeline,
// followed by the events received in the response (in reverse order).
let new_gap = prev_token
.map(|token| TimelineEntry::Gap { prev_token: PaginationToken(token) })
.into_iter();
let sync_events = events
.iter()
// Reverse the order of the events as `/messages` has been called with `dir=b`
// (backward). The `RoomEvents` API expects the first event to be the oldest.
.rev()
.cloned()
.map(|timeline_event| SyncTimelineEvent::from(timeline_event));
// For storage, reverse events to store them in the normal (non-reversed order).
//
// It's fine to convert from `TimelineEvent` (i.e. that has a room id) to
// `SyncTimelineEvent` (i.e. that doesn't have it), because those events are
// always tied to a room in storage anyways.
let new_events = events.iter().rev().map(|ev| TimelineEntry::Event(ev.clone().into()));
// There is a `token`/gap, let's replace it by new events!
if let Some(gap_identifier) = gap_identifier {
// Replace the gap by new events.
room_events
.replace_gap_at(sync_events, gap_identifier)
// SAFETY: we are sure that `gap_identifier` represents a valid `ChunkIdentifier`
// for a gap.
.unwrap();
let replaced = self
.store
.lock()
.await
.replace_gap(self.room.room_id(), token.as_ref(), new_gap.chain(new_events).collect())
.await?;
// And insert a new gap if there is any `prev_token`.
if let Some(prev_token_gap) = prev_token {
room_events
.insert_gap_at(prev_token_gap, gap_identifier.into())
// SAFETY: we are sure that `gap_identifier` represents a valid
// `ChunkIdentifier` for a gap.
.unwrap();
}
if !replaced {
// The previous token disappeared!
// This can happen if we got a limited timeline and lost track of our pagination
// token, because the whole timeline has been reset.
//
// TODO: With smarter reconciliation, this might get away. In the meanwhile,
// early return and forget about all the events.
trace!("gap was missing, likely because we observed a gappy sync response");
Ok(BackPaginationOutcome::UnknownBackpaginationToken)
} else {
trace!("replaced gap with new events from backpagination");
// TODO: implement smarter reconciliation later
//let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events });
Ok(BackPaginationOutcome::Success { events, reached_start })
}
// There is no `token`/gap identifier. Let's assume we must prepend the new events.
else {
let first_item_position =
room_events.events().nth(0).map(|(item_position, _)| item_position);
match first_item_position {
// Is there a first item? Insert at this position.
Some(item_position) => {
if let Some(prev_token_gap) = prev_token {
room_events.insert_gap_at(prev_token_gap, item_position).unwrap();
}
room_events.insert_events_at(sync_events, item_position).unwrap();
}
// There is no first item. Let's simply push.
None => {
if let Some(prev_token_gap) = prev_token {
room_events.push_gap(prev_token_gap);
}
room_events.push_events(sync_events);
}
}
Ok(BackPaginationOutcome::Success { events, reached_start })
}
}
@@ -689,6 +704,8 @@ impl RoomEventCacheInner {
max_wait: Option<Duration>,
) -> Result<Option<PaginationToken>> {
// Optimistically try to return the backpagination token immediately.
todo!();
/*
if let Some(token) =
self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await?
{
@@ -705,6 +722,7 @@ impl RoomEventCacheInner {
let _ = timeout(max_wait, self.pagination_token_notifier.notified()).await;
self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await
*/
}
}
@@ -758,7 +776,6 @@ pub enum RoomEventCacheUpdate {
#[cfg(test)]
mod tests {
use assert_matches2::assert_matches;
use matrix_sdk_common::executor::spawn;
use matrix_sdk_test::{async_test, sync_timeline_event};
@@ -803,6 +820,7 @@ mod tests {
assert_matches!(res.unwrap_err(), EventCacheError::UnknownBackpaginationToken);
}
/*
// Those tests require time to work, and it does not on wasm32.
#[cfg(not(target_arch = "wasm32"))]
mod time_tests {
@@ -1003,4 +1021,5 @@ mod tests {
insert_token_task.await.unwrap();
}
}
*/
}