fix: RoomEventCache::subscribe is now infallible.

This patch updates `RoomEventCache::subscribe` to be infallible. This
method wasn't able to return something else than an `Ok`. The return
type has been updated from `Result<T>` to `T`.
This commit is contained in:
Ivan Enderlin
2025-02-11 22:50:14 +01:00
parent 714caae545
commit ed16e91aed
6 changed files with 53 additions and 53 deletions

View File

@@ -160,7 +160,7 @@ impl TimelineBuilder {
event_cache.subscribe()?;
let (room_event_cache, event_cache_drop) = room.event_cache().await?;
let (_, mut event_subscriber) = room_event_cache.subscribe().await?;
let (_, mut event_subscriber) = room_event_cache.subscribe().await;
let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. });
let is_room_encrypted = room.is_encrypted().await.ok().unwrap_or_default();
@@ -241,15 +241,14 @@ impl TimelineBuilder {
//
// If we can't get a handle on the room cache's events, just clear the
// current timeline.
match room_event_cache.subscribe().await {
Ok((events, _)) => {
inner.replace_with_initial_remote_events(events.into_iter(), RemoteEventOrigin::Sync).await;
}
Err(err) => {
warn!("Error when re-inserting initial events into the timeline: {err}");
inner.clear().await;
}
}
let (initial_events, _stream) = room_event_cache.subscribe().await;
inner
.replace_with_initial_remote_events(
initial_events.into_iter(),
RemoteEventOrigin::Sync,
)
.await;
continue;
}
@@ -264,13 +263,15 @@ impl TimelineBuilder {
RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
trace!("Received new timeline events diffs");
inner.handle_remote_events_with_diffs(
diffs,
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
}
).await;
inner
.handle_remote_events_with_diffs(
diffs,
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
},
)
.await;
}
RoomEventCacheUpdate::AddEphemeralEvents { events } => {

View File

@@ -319,8 +319,7 @@ impl<P: RoomDataProvider> TimelineController<P> {
match &*focus_guard {
TimelineFocusData::Live => {
// Retrieve the cached events, and add them to the timeline.
let (events, _stream) =
room_event_cache.subscribe().await.map_err(Error::EventCacheError)?;
let (events, _stream) = room_event_cache.subscribe().await;
let has_events = !events.is_empty();

View File

@@ -754,7 +754,7 @@ mod tests {
let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await;
assert!(events.is_empty());
@@ -912,7 +912,7 @@ mod tests {
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (initial_events, _) = room_event_cache.subscribe().await.unwrap();
let (initial_events, _) = room_event_cache.subscribe().await;
// `add_initial_events` had an effect.
assert_eq!(initial_events.len(), 1);
}

View File

@@ -77,11 +77,11 @@ impl RoomEventCache {
/// Subscribe to this room updates, after getting the initial list of
/// events.
pub async fn subscribe(&self) -> Result<(Vec<TimelineEvent>, Receiver<RoomEventCacheUpdate>)> {
pub async fn subscribe(&self) -> (Vec<TimelineEvent>, Receiver<RoomEventCacheUpdate>) {
let state = self.inner.state.read().await;
let events = state.events().events().map(|(_position, item)| item.clone()).collect();
Ok((events, self.inner.sender.subscribe()))
(events, self.inner.sender.subscribe())
}
/// Return a [`RoomPagination`] API object useful for running
@@ -1284,7 +1284,7 @@ mod tests {
// The in-memory linked chunk keeps the bundled relation.
{
let (events, _) = room_event_cache.subscribe().await.unwrap();
let (events, _) = room_event_cache.subscribe().await;
assert_eq!(events.len(), 1);
@@ -1402,7 +1402,7 @@ mod tests {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
let (items, mut stream) = room_event_cache.subscribe().await;
// The rooms knows about the cached events.
assert!(room_event_cache.event(event_id1).await.is_some());
@@ -1427,7 +1427,7 @@ mod tests {
// The room event cache has forgotten about the events.
assert!(room_event_cache.event(event_id1).await.is_none());
let (items, _) = room_event_cache.subscribe().await.unwrap();
let (items, _) = room_event_cache.subscribe().await;
assert!(items.is_empty());
// The event cache store too.
@@ -1517,7 +1517,7 @@ mod tests {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (items, _stream) = room_event_cache.subscribe().await.unwrap();
let (items, _stream) = room_event_cache.subscribe().await;
// The reloaded room must contain the two events.
assert_eq!(items.len(), 2);
@@ -1536,7 +1536,7 @@ mod tests {
// when subscribing, to check that the items correspond to their new
// positions. The duplicated item is removed (so it's not the first
// element anymore), and it's added to the back of the list.
let (items, _stream) = room_event_cache.subscribe().await.unwrap();
let (items, _stream) = room_event_cache.subscribe().await;
assert_eq!(items.len(), 2);
assert_eq!(items[0].event_id().unwrap(), event_id1);
assert_eq!(items[1].event_id().unwrap(), event_id2);
@@ -1586,7 +1586,7 @@ mod tests {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (items, _stream) = room_event_cache.subscribe().await.unwrap();
let (items, _stream) = room_event_cache.subscribe().await;
// Because the persisted content was invalid, the room store is reset: there are
// no events in the cache.

View File

@@ -69,7 +69,7 @@ async fn test_event_cache_receives_events() {
// If I create a room event subscriber,
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
let (events, mut subscriber) = room_event_cache.subscribe().await;
// Then at first it's empty, and the subscriber doesn't yield anything.
assert!(events.is_empty());
@@ -143,7 +143,7 @@ async fn test_ignored_unignored() {
// And subscribe to the room,
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
// Then at first it contains the two initial events.
assert_eq!(events.len(), 2);
@@ -201,7 +201,7 @@ async fn test_ignored_unignored() {
{
let room = client.get_room(other_room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, _) = room_event_cache.subscribe().await.unwrap();
let (events, _) = room_event_cache.subscribe().await;
assert!(events.is_empty());
}
@@ -256,7 +256,7 @@ async fn test_backpaginate_once() {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
// This is racy: either the initial message has been processed by the event
// cache (and no room updates will happen in this case), or it hasn't, and
@@ -341,7 +341,7 @@ async fn test_backpaginate_many_times_with_many_iterations() {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
// This is racy: either the initial message has been processed by the event
// cache (and no room updates will happen in this case), or it hasn't, and
@@ -437,7 +437,7 @@ async fn test_backpaginate_many_times_with_many_iterations() {
assert!(room_stream.is_empty());
// And next time I'll open the room, I'll get the events in the right order.
let (events, room_stream) = room_event_cache.subscribe().await.unwrap();
let (events, room_stream) = room_event_cache.subscribe().await;
assert_event_matches_msg(&events[0], "oh well");
assert_event_matches_msg(&events[1], "hello");
@@ -479,7 +479,7 @@ async fn test_backpaginate_many_times_with_one_iteration() {
let (room_event_cache, _drop_handles) =
client.get_room(room_id).unwrap().event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
// This is racy: either the initial message has been processed by the event
// cache (and no room updates will happen in this case), or it hasn't, and
@@ -577,7 +577,7 @@ async fn test_backpaginate_many_times_with_one_iteration() {
});
// And next time I'll open the room, I'll get the events in the right order.
let (events, room_stream) = room_event_cache.subscribe().await.unwrap();
let (events, room_stream) = room_event_cache.subscribe().await;
assert_event_matches_msg(&events[0], "oh well");
assert_event_matches_msg(&events[1], "hello");
@@ -619,7 +619,7 @@ async fn test_reset_while_backpaginating() {
let (room_event_cache, _drop_handles) =
client.get_room(room_id).unwrap().event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
wait_for_initial_events(events, &mut room_stream).await;
@@ -763,7 +763,7 @@ async fn test_backpaginating_without_token() {
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
assert!(events.is_empty());
assert!(room_stream.is_empty());
@@ -821,7 +821,7 @@ async fn test_limited_timeline_resets_pagination() {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
assert!(events.is_empty());
assert!(room_stream.is_empty());
@@ -908,7 +908,7 @@ async fn test_limited_timeline_with_storage() {
)
.await;
let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
let (initial_events, mut subscriber) = room_event_cache.subscribe().await;
// This is racy: either the sync has been handled, or it hasn't yet.
if initial_events.is_empty() {
@@ -980,7 +980,7 @@ async fn test_limited_timeline_without_storage() {
)
.await;
let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
let (initial_events, mut subscriber) = room_event_cache.subscribe().await;
// This is racy: either the sync has been handled, or it hasn't yet.
if initial_events.is_empty() {
@@ -1115,7 +1115,7 @@ async fn test_backpaginate_with_no_initial_events() {
pagination.run_backwards(20, once).await.unwrap();
// The linked chunk should contain the events in the correct order.
let (events, _stream) = room_event_cache.subscribe().await.unwrap();
let (events, _stream) = room_event_cache.subscribe().await;
assert_eq!(events.len(), 3, "{events:?}");
assert_event_matches_msg(&events[0], "oh well");
@@ -1150,7 +1150,7 @@ async fn test_backpaginate_replace_empty_gap() {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await;
wait_for_initial_events(events, &mut stream).await;
// The first back-pagination will return a previous-batch token, but no events.
@@ -1178,7 +1178,7 @@ async fn test_backpaginate_replace_empty_gap() {
pagination.run_backwards(20, once).await.unwrap();
// The linked chunk should contain the events in the correct order.
let (events, _stream) = room_event_cache.subscribe().await.unwrap();
let (events, _stream) = room_event_cache.subscribe().await;
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
@@ -1219,7 +1219,7 @@ async fn test_no_gap_stored_after_deduplicated_sync() {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await;
if events.is_empty() {
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv());
@@ -1259,7 +1259,7 @@ async fn test_no_gap_stored_after_deduplicated_sync() {
let outcome = pagination.run_backwards(20, once).await.unwrap();
assert!(outcome.reached_start);
let (events, stream) = room_event_cache.subscribe().await.unwrap();
let (events, stream) = room_event_cache.subscribe().await;
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_event_matches_msg(&events[2], "sup");
@@ -1296,7 +1296,7 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await;
if events.is_empty() {
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv());
@@ -1391,7 +1391,7 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() {
assert!(outcome.events.is_empty());
assert!(stream.is_empty());
let (events, stream) = room_event_cache.subscribe().await.unwrap();
let (events, stream) = room_event_cache.subscribe().await;
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_event_matches_msg(&events[2], "sup");
@@ -1428,7 +1428,7 @@ async fn test_dont_delete_gap_that_wasnt_inserted() {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await;
if events.is_empty() {
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv());
}
@@ -1492,7 +1492,7 @@ async fn test_apply_redaction_when_redaction_comes_later() {
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
// Wait for the first event.
let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
let (events, mut subscriber) = room_event_cache.subscribe().await;
if events.is_empty() {
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv()
@@ -1554,7 +1554,7 @@ async fn test_apply_redaction_when_redacted_and_redaction_are_in_same_sync() {
let room_id = room_id!("!omelette:fromage.fr");
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
let (_events, mut subscriber) = room_event_cache.subscribe().await;
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));

View File

@@ -789,7 +789,7 @@ impl App {
Handle::current().block_on(async {
let (room_event_cache, _drop_handles) =
room.event_cache().await.unwrap();
let (events, _) = room_event_cache.subscribe().await.unwrap();
let (events, _) = room_event_cache.subscribe().await;
events
})
});