diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 98ca6969f..237f8f725 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -160,7 +160,7 @@ impl TimelineBuilder { event_cache.subscribe()?; let (room_event_cache, event_cache_drop) = room.event_cache().await?; - let (_, mut event_subscriber) = room_event_cache.subscribe().await?; + let (_, mut event_subscriber) = room_event_cache.subscribe().await; let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. }); let is_room_encrypted = room.is_encrypted().await.ok().unwrap_or_default(); @@ -241,15 +241,14 @@ impl TimelineBuilder { // // If we can't get a handle on the room cache's events, just clear the // current timeline. - match room_event_cache.subscribe().await { - Ok((events, _)) => { - inner.replace_with_initial_remote_events(events.into_iter(), RemoteEventOrigin::Sync).await; - } - Err(err) => { - warn!("Error when re-inserting initial events into the timeline: {err}"); - inner.clear().await; - } - } + let (initial_events, _stream) = room_event_cache.subscribe().await; + + inner + .replace_with_initial_remote_events( + initial_events.into_iter(), + RemoteEventOrigin::Sync, + ) + .await; continue; } @@ -264,13 +263,15 @@ impl TimelineBuilder { RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => { trace!("Received new timeline events diffs"); - inner.handle_remote_events_with_diffs( - diffs, - match origin { - EventsOrigin::Sync => RemoteEventOrigin::Sync, - EventsOrigin::Pagination => RemoteEventOrigin::Pagination, - } - ).await; + inner + .handle_remote_events_with_diffs( + diffs, + match origin { + EventsOrigin::Sync => RemoteEventOrigin::Sync, + EventsOrigin::Pagination => RemoteEventOrigin::Pagination, + }, + ) + .await; } RoomEventCacheUpdate::AddEphemeralEvents { events } => { diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index 406f78e88..32c70f290 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -319,8 +319,7 @@ impl TimelineController

{ match &*focus_guard { TimelineFocusData::Live => { // Retrieve the cached events, and add them to the timeline. - let (events, _stream) = - room_event_cache.subscribe().await.map_err(Error::EventCacheError)?; + let (events, _stream) = room_event_cache.subscribe().await; let has_events = !events.is_empty(); diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index ffccf40ff..500141498 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -754,7 +754,7 @@ mod tests { let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut stream) = room_event_cache.subscribe().await; assert!(events.is_empty()); @@ -912,7 +912,7 @@ mod tests { let room = client.get_room(room_id).unwrap(); let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (initial_events, _) = room_event_cache.subscribe().await.unwrap(); + let (initial_events, _) = room_event_cache.subscribe().await; // `add_initial_events` had an effect. assert_eq!(initial_events.len(), 1); } diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 1d9b9ff51..7473001d9 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -77,11 +77,11 @@ impl RoomEventCache { /// Subscribe to this room updates, after getting the initial list of /// events. - pub async fn subscribe(&self) -> Result<(Vec, Receiver)> { + pub async fn subscribe(&self) -> (Vec, Receiver) { let state = self.inner.state.read().await; let events = state.events().events().map(|(_position, item)| item.clone()).collect(); - Ok((events, self.inner.sender.subscribe())) + (events, self.inner.sender.subscribe()) } /// Return a [`RoomPagination`] API object useful for running @@ -1284,7 +1284,7 @@ mod tests { // The in-memory linked chunk keeps the bundled relation. { - let (events, _) = room_event_cache.subscribe().await.unwrap(); + let (events, _) = room_event_cache.subscribe().await; assert_eq!(events.len(), 1); @@ -1402,7 +1402,7 @@ mod tests { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (items, mut stream) = room_event_cache.subscribe().await.unwrap(); + let (items, mut stream) = room_event_cache.subscribe().await; // The rooms knows about the cached events. assert!(room_event_cache.event(event_id1).await.is_some()); @@ -1427,7 +1427,7 @@ mod tests { // The room event cache has forgotten about the events. assert!(room_event_cache.event(event_id1).await.is_none()); - let (items, _) = room_event_cache.subscribe().await.unwrap(); + let (items, _) = room_event_cache.subscribe().await; assert!(items.is_empty()); // The event cache store too. @@ -1517,7 +1517,7 @@ mod tests { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (items, _stream) = room_event_cache.subscribe().await.unwrap(); + let (items, _stream) = room_event_cache.subscribe().await; // The reloaded room must contain the two events. assert_eq!(items.len(), 2); @@ -1536,7 +1536,7 @@ mod tests { // when subscribing, to check that the items correspond to their new // positions. The duplicated item is removed (so it's not the first // element anymore), and it's added to the back of the list. - let (items, _stream) = room_event_cache.subscribe().await.unwrap(); + let (items, _stream) = room_event_cache.subscribe().await; assert_eq!(items.len(), 2); assert_eq!(items[0].event_id().unwrap(), event_id1); assert_eq!(items[1].event_id().unwrap(), event_id2); @@ -1586,7 +1586,7 @@ mod tests { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (items, _stream) = room_event_cache.subscribe().await.unwrap(); + let (items, _stream) = room_event_cache.subscribe().await; // Because the persisted content was invalid, the room store is reset: there are // no events in the cache. diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 1ce602efd..1daaa068d 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -69,7 +69,7 @@ async fn test_event_cache_receives_events() { // If I create a room event subscriber, let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); + let (events, mut subscriber) = room_event_cache.subscribe().await; // Then at first it's empty, and the subscriber doesn't yield anything. assert!(events.is_empty()); @@ -143,7 +143,7 @@ async fn test_ignored_unignored() { // And subscribe to the room, let room = client.get_room(room_id).unwrap(); let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut room_stream) = room_event_cache.subscribe().await; // Then at first it contains the two initial events. assert_eq!(events.len(), 2); @@ -201,7 +201,7 @@ async fn test_ignored_unignored() { { let room = client.get_room(other_room_id).unwrap(); let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, _) = room_event_cache.subscribe().await.unwrap(); + let (events, _) = room_event_cache.subscribe().await; assert!(events.is_empty()); } @@ -256,7 +256,7 @@ async fn test_backpaginate_once() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut room_stream) = room_event_cache.subscribe().await; // This is racy: either the initial message has been processed by the event // cache (and no room updates will happen in this case), or it hasn't, and @@ -341,7 +341,7 @@ async fn test_backpaginate_many_times_with_many_iterations() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut room_stream) = room_event_cache.subscribe().await; // This is racy: either the initial message has been processed by the event // cache (and no room updates will happen in this case), or it hasn't, and @@ -437,7 +437,7 @@ async fn test_backpaginate_many_times_with_many_iterations() { assert!(room_stream.is_empty()); // And next time I'll open the room, I'll get the events in the right order. - let (events, room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, room_stream) = room_event_cache.subscribe().await; assert_event_matches_msg(&events[0], "oh well"); assert_event_matches_msg(&events[1], "hello"); @@ -479,7 +479,7 @@ async fn test_backpaginate_many_times_with_one_iteration() { let (room_event_cache, _drop_handles) = client.get_room(room_id).unwrap().event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut room_stream) = room_event_cache.subscribe().await; // This is racy: either the initial message has been processed by the event // cache (and no room updates will happen in this case), or it hasn't, and @@ -577,7 +577,7 @@ async fn test_backpaginate_many_times_with_one_iteration() { }); // And next time I'll open the room, I'll get the events in the right order. - let (events, room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, room_stream) = room_event_cache.subscribe().await; assert_event_matches_msg(&events[0], "oh well"); assert_event_matches_msg(&events[1], "hello"); @@ -619,7 +619,7 @@ async fn test_reset_while_backpaginating() { let (room_event_cache, _drop_handles) = client.get_room(room_id).unwrap().event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut room_stream) = room_event_cache.subscribe().await; wait_for_initial_events(events, &mut room_stream).await; @@ -763,7 +763,7 @@ async fn test_backpaginating_without_token() { let room = server.sync_joined_room(&client, room_id).await; let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut room_stream) = room_event_cache.subscribe().await; assert!(events.is_empty()); assert!(room_stream.is_empty()); @@ -821,7 +821,7 @@ async fn test_limited_timeline_resets_pagination() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut room_stream) = room_event_cache.subscribe().await; assert!(events.is_empty()); assert!(room_stream.is_empty()); @@ -908,7 +908,7 @@ async fn test_limited_timeline_with_storage() { ) .await; - let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); + let (initial_events, mut subscriber) = room_event_cache.subscribe().await; // This is racy: either the sync has been handled, or it hasn't yet. if initial_events.is_empty() { @@ -980,7 +980,7 @@ async fn test_limited_timeline_without_storage() { ) .await; - let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); + let (initial_events, mut subscriber) = room_event_cache.subscribe().await; // This is racy: either the sync has been handled, or it hasn't yet. if initial_events.is_empty() { @@ -1115,7 +1115,7 @@ async fn test_backpaginate_with_no_initial_events() { pagination.run_backwards(20, once).await.unwrap(); // The linked chunk should contain the events in the correct order. - let (events, _stream) = room_event_cache.subscribe().await.unwrap(); + let (events, _stream) = room_event_cache.subscribe().await; assert_eq!(events.len(), 3, "{events:?}"); assert_event_matches_msg(&events[0], "oh well"); @@ -1150,7 +1150,7 @@ async fn test_backpaginate_replace_empty_gap() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut stream) = room_event_cache.subscribe().await; wait_for_initial_events(events, &mut stream).await; // The first back-pagination will return a previous-batch token, but no events. @@ -1178,7 +1178,7 @@ async fn test_backpaginate_replace_empty_gap() { pagination.run_backwards(20, once).await.unwrap(); // The linked chunk should contain the events in the correct order. - let (events, _stream) = room_event_cache.subscribe().await.unwrap(); + let (events, _stream) = room_event_cache.subscribe().await; assert_event_matches_msg(&events[0], "hello"); assert_event_matches_msg(&events[1], "world"); @@ -1219,7 +1219,7 @@ async fn test_no_gap_stored_after_deduplicated_sync() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut stream) = room_event_cache.subscribe().await; if events.is_empty() { assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv()); @@ -1259,7 +1259,7 @@ async fn test_no_gap_stored_after_deduplicated_sync() { let outcome = pagination.run_backwards(20, once).await.unwrap(); assert!(outcome.reached_start); - let (events, stream) = room_event_cache.subscribe().await.unwrap(); + let (events, stream) = room_event_cache.subscribe().await; assert_event_matches_msg(&events[0], "hello"); assert_event_matches_msg(&events[1], "world"); assert_event_matches_msg(&events[2], "sup"); @@ -1296,7 +1296,7 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut stream) = room_event_cache.subscribe().await; if events.is_empty() { assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv()); @@ -1391,7 +1391,7 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { assert!(outcome.events.is_empty()); assert!(stream.is_empty()); - let (events, stream) = room_event_cache.subscribe().await.unwrap(); + let (events, stream) = room_event_cache.subscribe().await; assert_event_matches_msg(&events[0], "hello"); assert_event_matches_msg(&events[1], "world"); assert_event_matches_msg(&events[2], "sup"); @@ -1428,7 +1428,7 @@ async fn test_dont_delete_gap_that_wasnt_inserted() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); + let (events, mut stream) = room_event_cache.subscribe().await; if events.is_empty() { assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv()); } @@ -1492,7 +1492,7 @@ async fn test_apply_redaction_when_redaction_comes_later() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); // Wait for the first event. - let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); + let (events, mut subscriber) = room_event_cache.subscribe().await; if events.is_empty() { assert_let_timeout!( Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv() @@ -1554,7 +1554,7 @@ async fn test_apply_redaction_when_redacted_and_redaction_are_in_same_sync() { let room_id = room_id!("!omelette:fromage.fr"); let room = server.sync_joined_room(&client, room_id).await; let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (_events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); + let (_events, mut subscriber) = room_event_cache.subscribe().await; let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); diff --git a/labs/multiverse/src/main.rs b/labs/multiverse/src/main.rs index 2f9f3f27f..e1a16e00d 100644 --- a/labs/multiverse/src/main.rs +++ b/labs/multiverse/src/main.rs @@ -789,7 +789,7 @@ impl App { Handle::current().block_on(async { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, _) = room_event_cache.subscribe().await.unwrap(); + let (events, _) = room_event_cache.subscribe().await; events }) });