diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index cea766068..d3e49291a 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -12,21 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{iter, pin::pin, sync::Arc}; +use std::sync::Arc; use async_std::sync::Mutex; -use futures_util::StreamExt; use imbl::Vector; use matrix_sdk::{ - deserialized_responses::SyncTimelineEvent, executor::spawn, room, sync::RoomUpdate, Client, -}; -use ruma::{ - events::receipt::{ReceiptThread, ReceiptType}, - OwnedRoomId, + deserialized_responses::SyncTimelineEvent, executor::spawn, room, sync::RoomUpdate, }; +use ruma::events::receipt::{ReceiptThread, ReceiptType}; use tokio::sync::{broadcast, mpsc}; use tracing::{error, info, warn}; +#[cfg(feature = "e2e-encryption")] +use super::to_device::{handle_forwarded_room_key_event, handle_room_key_event}; use super::{inner::TimelineInner, queue::send_queued_messages, Timeline, TimelineDropHandle}; /// Builder that allows creating and configuring various parts of a @@ -191,30 +189,24 @@ impl TimelineBuilder { } }); + // Not using room.add_event_handler here because RoomKey events are + // to-device events that are not received in the context of a room. + #[cfg(feature = "e2e-encryption")] - let room_key_update_join_handle = { - let inner = inner.clone(); - match client.encryption().room_keys_for_room_received_stream(room.room_id()).await { - Some(stream) => spawn(async move { - let mut stream = pin!(stream); - while let Some(vec) = stream.next().await { - for room_key_info in vec { - retry_decryption( - client.clone(), - inner.clone(), - room_key_info.room_id, - room_key_info.session_id, - ) - .await; - } - } - }), - None => { - error!("Can't listen for room key updates, OlmMachine not set up"); - spawn(async move {}) - } - } - }; + let room_key_handle = client + .add_event_handler(handle_room_key_event(inner.clone(), room.room_id().to_owned())); + #[cfg(feature = "e2e-encryption")] + let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event( + inner.clone(), + room.room_id().to_owned(), + )); + + let handles = vec![ + #[cfg(feature = "e2e-encryption")] + room_key_handle, + #[cfg(feature = "e2e-encryption")] + forwarded_room_key_handle, + ]; let (msg_sender, msg_receiver) = mpsc::channel(1); if !read_only { @@ -229,9 +221,9 @@ impl TimelineBuilder { _end_token: Mutex::new(None), msg_sender, drop_handle: Arc::new(TimelineDropHandle { + client, + event_handler_handles: handles, room_update_join_handle, - #[cfg(feature = "e2e-encryption")] - room_key_update_join_handle, }), }; @@ -250,18 +242,3 @@ impl TimelineBuilder { timeline } } - -#[tracing::instrument(skip(client, inner))] -async fn retry_decryption( - client: Client, - inner: Arc, - room_id: OwnedRoomId, - session_id: String, -) { - let Some(room) = client.get_room(&room_id) else { - error!("Failed to fetch room object"); - return; - }; - - inner.retry_event_decryption(&room, Some(iter::once(session_id.as_str()).collect())).await; -} diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index aef01abda..dfdcdd609 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -24,9 +24,10 @@ use futures_core::Stream; use imbl::Vector; use matrix_sdk::{ attachment::AttachmentConfig, + event_handler::EventHandlerHandle, executor::JoinHandle, room::{self, Joined, MessagesOptions, Receipts, Room}, - Result, + Client, Result, }; use mime::Mime; use pin_project_lite::pin_project; @@ -59,6 +60,8 @@ mod read_receipts; mod sliding_sync_ext; #[cfg(test)] mod tests; +#[cfg(feature = "e2e-encryption")] +mod to_device; mod traits; mod virtual_item; @@ -649,16 +652,17 @@ impl Timeline { #[derive(Debug)] struct TimelineDropHandle { + client: Client, + event_handler_handles: Vec, room_update_join_handle: JoinHandle<()>, - #[cfg(feature = "e2e-encryption")] - room_key_update_join_handle: JoinHandle<()>, } impl Drop for TimelineDropHandle { fn drop(&mut self) { + for handle in self.event_handler_handles.drain(..) { + self.client.remove_event_handler(handle); + } self.room_update_join_handle.abort(); - #[cfg(feature = "e2e-encryption")] - self.room_key_update_join_handle.abort(); } } diff --git a/crates/matrix-sdk-ui/src/timeline/to_device.rs b/crates/matrix-sdk-ui/src/timeline/to_device.rs new file mode 100644 index 000000000..b30f204cc --- /dev/null +++ b/crates/matrix-sdk-ui/src/timeline/to_device.rs @@ -0,0 +1,79 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{iter, sync::Arc}; + +use matrix_sdk::{event_handler::EventHandler, Client}; +use ruma::{ + events::{forwarded_room_key::ToDeviceForwardedRoomKeyEvent, room_key::ToDeviceRoomKeyEvent}, + OwnedRoomId, +}; +use tracing::{debug_span, error, trace, Instrument}; + +use super::inner::TimelineInner; + +pub(super) fn handle_room_key_event( + inner: Arc, + room_id: OwnedRoomId, +) -> impl EventHandler { + move |event: ToDeviceRoomKeyEvent, client: Client| { + let inner = inner.clone(); + let room_id = room_id.clone(); + async move { + let event_room_id = event.content.room_id; + let session_id = event.content.session_id; + retry_decryption(client, inner, room_id, event_room_id, session_id).await; + } + .instrument(debug_span!("handle_room_key_event")) + } +} + +pub(super) fn handle_forwarded_room_key_event( + inner: Arc, + room_id: OwnedRoomId, +) -> impl EventHandler { + move |event: ToDeviceForwardedRoomKeyEvent, client: Client| { + let inner = inner.clone(); + let room_id = room_id.clone(); + async move { + let event_room_id = event.content.room_id; + let session_id = event.content.session_id; + retry_decryption(client, inner, room_id, event_room_id, session_id).await; + } + .instrument(debug_span!("handle_forwarded_room_key_event")) + } +} + +async fn retry_decryption( + client: Client, + inner: Arc, + room_id: OwnedRoomId, + event_room_id: OwnedRoomId, + session_id: String, +) { + if event_room_id != room_id { + trace!( + ?event_room_id, timeline_room_id = ?room_id, ?session_id, + "Received to-device room key event for a different room, ignoring" + ); + return; + } + + let Some(room) = client.get_room(&room_id) else { + error!("Failed to fetch room object"); + return; + }; + + inner.retry_event_decryption(&room, Some(iter::once(session_id.as_str()).collect())).await; +}