diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index b3e7ec8cd..44bba1843 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -23,6 +23,7 @@ use matrix_sdk::{ }; use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; use tokio::sync::broadcast::error::RecvError; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{info, info_span, trace, warn, Instrument, Span}; use super::{ @@ -433,6 +434,47 @@ impl TimelineBuilder { }) }; + // TODO: Technically, this should be the only stream we need to listen to get + // notified when we should retry to decrypt an event. We sadly can't do that, + // since the cross-process support kills the `OlmMachine` which then in + // turn kills this stream. Once this is solved remove all the other ways we + // listen for room keys. + let room_keys_received_join_handle = { + let inner = controller.clone(); + let stream = client.encryption().room_keys_received_stream().await.expect( + "We should be logged in by now, so we should have access to an OlmMachine \ + to be able to listen to this stream", + ); + + spawn(async move { + pin_mut!(stream); + + while let Some(room_keys) = stream.next().await { + let session_ids = match room_keys { + Ok(room_keys) => { + let session_ids: BTreeSet = room_keys + .into_iter() + .filter(|info| info.room_id == inner.room().room_id()) + .map(|info| info.session_id) + .collect(); + + Some(session_ids) + } + Err(BroadcastStreamRecvError::Lagged(missed_updates)) => { + // We lagged, let's retry to decrypt anything we have, maybe something + // was received. + warn!(missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline"); + + None + } + }; + + let room = inner.room(); + inner.retry_event_decryption(room, session_ids).await; + } + }) + }; + let timeline = Timeline { controller, event_cache: room_event_cache, @@ -443,6 +485,7 @@ impl TimelineBuilder { pinned_events_join_handle, room_key_from_backups_join_handle, room_key_backup_enabled_join_handle, + room_keys_received_join_handle, local_echo_listener_handle, _event_cache_drop_handle: event_cache_drop, encryption_changes_handle, diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 9a9623e3f..927180093 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -832,6 +832,7 @@ struct TimelineDropHandle { room_update_join_handle: JoinHandle<()>, pinned_events_join_handle: Option>, room_key_from_backups_join_handle: JoinHandle<()>, + room_keys_received_join_handle: JoinHandle<()>, room_key_backup_enabled_join_handle: JoinHandle<()>, local_echo_listener_handle: JoinHandle<()>, _event_cache_drop_handle: Arc, @@ -852,6 +853,7 @@ impl Drop for TimelineDropHandle { self.room_update_join_handle.abort(); self.room_key_from_backups_join_handle.abort(); self.room_key_backup_enabled_join_handle.abort(); + self.room_keys_received_join_handle.abort(); self.encryption_changes_handle.abort(); } }