diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index b7b2ea53f..547bc45b3 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::BTreeSet, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; @@ -257,34 +260,10 @@ impl TimelineBuilder { let handles = vec![room_key_handle, forwarded_room_key_handle]; - let room_key_from_backups_join_handle = { - let inner = controller.clone(); - let room_id = inner.room().room_id(); - - let stream = client.encryption().backups().room_keys_for_room_stream(room_id); - - spawn(async move { - pin_mut!(stream); - - while let Some(update) = stream.next().await { - let room = inner.room(); - - match update { - Ok(info) => { - let mut session_ids = BTreeSet::new(); - - for set in info.into_values() { - session_ids.extend(set); - } - - inner.retry_event_decryption(room, Some(session_ids)).await; - } - // We lagged, so retry every event. - Err(_) => inner.retry_event_decryption(room, None).await, - } - } - }) - }; + let room_key_from_backups_join_handle = spawn(room_keys_from_backups_task( + client.encryption().backups().room_keys_for_room_stream(controller.room().room_id()), + controller.clone(), + )); let room_key_backup_enabled_join_handle = { let inner = controller.clone(); @@ -518,3 +497,29 @@ async fn room_send_queue_update_task( } } } + +/// The task that handles the room keys from backups. +async fn room_keys_from_backups_task(stream: S, timeline_controller: TimelineController) +where + S: Stream>, BroadcastStreamRecvError>>, +{ + pin_mut!(stream); + + while let Some(update) = stream.next().await { + let room = timeline_controller.room(); + + match update { + Ok(info) => { + let mut session_ids = BTreeSet::new(); + + for set in info.into_values() { + session_ids.extend(set); + } + + timeline_controller.retry_event_decryption(room, Some(session_ids)).await; + } + // We lagged, so retry every event. + Err(_) => timeline_controller.retry_event_decryption(room, None).await, + } + } +}