event cache: move reacting to (un)blocks in the event cache

This commit is contained in:
Benjamin Bouvier
2024-05-02 16:07:25 +02:00
parent 9ef465fdf4
commit ac6fa7abb1
3 changed files with 50 additions and 37 deletions

View File

@@ -245,21 +245,6 @@ impl TimelineBuilder {
.instrument(span)
});
let mut ignore_user_list_stream = client.subscribe_to_ignore_user_list_changes();
let ignore_user_list_update_join_handle = spawn({
let inner = inner.clone();
let span = info_span!(parent: Span::none(), "ignore_user_list_update_handler", room_id = ?room.room_id());
span.follows_from(Span::current());
async move {
while ignore_user_list_stream.next().await.is_some() {
inner.clear().await;
}
}
.instrument(span)
});
// Not using room.add_event_handler here because RoomKey events are
// to-device events that are not received in the context of a room.
@@ -321,7 +306,6 @@ impl TimelineBuilder {
client,
event_handler_handles: handles,
room_update_join_handle,
ignore_user_list_update_join_handle,
room_key_from_backups_join_handle,
_event_cache_drop_handle: event_cache_drop,
}),

View File

@@ -820,7 +820,6 @@ struct TimelineDropHandle {
client: Client,
event_handler_handles: Vec<EventHandlerHandle>,
room_update_join_handle: JoinHandle<()>,
ignore_user_list_update_join_handle: JoinHandle<()>,
room_key_from_backups_join_handle: JoinHandle<()>,
_event_cache_drop_handle: Arc<EventCacheDropHandles>,
}
@@ -831,7 +830,6 @@ impl Drop for TimelineDropHandle {
self.client.remove_event_handler(handle);
}
self.room_update_join_handle.abort();
self.ignore_user_list_update_join_handle.abort();
self.room_key_from_backups_join_handle.abort();
}
}

View File

@@ -48,6 +48,7 @@ use std::{
time::Duration,
};
use eyeball::Subscriber;
use matrix_sdk_base::{
deserialized_responses::{AmbiguityChange, SyncTimelineEvent, TimelineEvent},
sync::{JoinedRoomUpdate, LeftRoomUpdate, RoomUpdates, Timeline},
@@ -66,7 +67,7 @@ use tokio::{
},
time::timeout,
};
use tracing::{error, instrument, trace, warn};
use tracing::{error, info_span, instrument, trace, warn, Instrument as _, Span};
use self::{
linked_chunk::ChunkContent,
@@ -118,6 +119,9 @@ pub type Result<T> = std::result::Result<T, EventCacheError>;
pub struct EventCacheDropHandles {
/// Task that listens to room updates.
listen_updates_task: JoinHandle<()>,
/// Task that listens to updates to the user's ignored list.
ignore_user_list_update_task: JoinHandle<()>,
}
impl Debug for EventCacheDropHandles {
@@ -129,6 +133,7 @@ impl Debug for EventCacheDropHandles {
impl Drop for EventCacheDropHandles {
fn drop(&mut self) {
self.listen_updates_task.abort();
self.ignore_user_list_update_task.abort();
}
}
@@ -172,16 +177,39 @@ impl EventCache {
let _ = self.inner.drop_handles.get_or_init(|| {
// Spawn the task that will listen to all the room updates at once.
let room_updates_feed = client.subscribe_to_all_room_updates();
let listen_updates_task =
spawn(Self::listen_task(self.inner.clone(), room_updates_feed));
let listen_updates_task = spawn(Self::listen_task(
self.inner.clone(),
client.subscribe_to_all_room_updates(),
));
Arc::new(EventCacheDropHandles { listen_updates_task })
let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
self.inner.clone(),
client.subscribe_to_ignore_user_list_changes(),
));
Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task })
});
Ok(())
}
#[instrument(skip_all)]
async fn ignore_user_list_update_task(
inner: Arc<EventCacheInner>,
mut ignore_user_list_stream: Subscriber<Vec<String>>,
) {
let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
span.follows_from(Span::current());
async move {
while ignore_user_list_stream.next().await.is_some() {
inner.clear_all_rooms().await;
}
}
.instrument(span)
.await;
}
#[instrument(skip_all)]
async fn listen_task(
inner: Arc<EventCacheInner>,
@@ -209,20 +237,7 @@ impl EventCache {
// no way to reconcile at the moment!
// TODO: implement Smart Matching™,
warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
// Note: one must NOT clear the `by_room` map, because if something subscribed
// to a room update, they would never get any new update for that room, since
// re-creating the `RoomEventCache` would create a new unrelated sender.
let rooms = inner.by_room.write().await;
for room in rooms.values() {
// Notify all the observers that we've lost track of state. (We ignore the
// error if there aren't any.)
let _ = room.inner.sender.send(RoomEventCacheUpdate::Clear);
// Clear all the events in memory.
room.inner.events.write().await.reset();
}
inner.clear_all_rooms().await;
}
Err(RecvError::Closed) => {
@@ -301,6 +316,22 @@ impl EventCacheInner {
Ok(Client { inner: self.client.upgrade().ok_or(EventCacheError::ClientDropped)? })
}
/// Clears all the room's data.
async fn clear_all_rooms(&self) {
// Note: one must NOT clear the `by_room` map, because if something subscribed
// to a room update, they would never get any new update for that room, since
// re-creating the `RoomEventCache` would create a new unrelated sender.
let rooms = self.by_room.write().await;
for room in rooms.values() {
// Notify all the observers that we've lost track of state. (We ignore the
// error if there aren't any.)
let _ = room.inner.sender.send(RoomEventCacheUpdate::Clear);
// Clear all the events in memory.
room.inner.events.write().await.reset();
}
}
/// Handles a single set of room updates at once.
#[instrument(skip(self, updates))]
async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {