diff --git a/crates/matrix-sdk-ui/src/lib.rs b/crates/matrix-sdk-ui/src/lib.rs index 32dc10a78..8d4b2a5b0 100644 --- a/crates/matrix-sdk-ui/src/lib.rs +++ b/crates/matrix-sdk-ui/src/lib.rs @@ -23,3 +23,13 @@ pub mod timeline; #[cfg(feature = "experimental-room-list")] pub use self::room_list_service::RoomListService; pub use self::timeline::Timeline; + +#[cfg(all(test, not(target_arch = "wasm32")))] +#[ctor::ctor] +fn init_logging() { + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::from_default_env()) + .with(tracing_subscriber::fmt::layer().with_test_writer()) + .init(); +} diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 4d6169c0d..86bbef3bb 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -111,7 +111,7 @@ impl TimelineBuilder { .await { Ok(Some(read_receipt)) => { - inner.set_initial_user_receipt(ReceiptType::Read, read_receipt); + inner.set_initial_user_receipt(ReceiptType::Read, read_receipt).await; } Err(e) => { error!("Failed to get public read receipt of own user from the store: {e}"); @@ -128,7 +128,9 @@ impl TimelineBuilder { .await { Ok(Some(private_read_receipt)) => { - inner.set_initial_user_receipt(ReceiptType::ReadPrivate, private_read_receipt); + inner + .set_initial_user_receipt(ReceiptType::ReadPrivate, private_read_receipt) + .await; } Err(e) => { error!("Failed to get private read receipt of own user from the store: {e}"); @@ -144,7 +146,6 @@ impl TimelineBuilder { inner.load_fully_read_event().await; } - let inner = Arc::new(inner); let room = inner.room(); let client = room.client(); diff --git a/crates/matrix-sdk-ui/src/timeline/inner.rs b/crates/matrix-sdk-ui/src/timeline/inner.rs index 9e54e46a1..9f1d758e2 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner.rs @@ -69,9 +69,9 @@ use super::{ }; use crate::events::SyncTimelineEventWithoutContent; -#[derive(Debug)] +#[derive(Clone, Debug)] pub(super) struct TimelineInner { - state: Mutex, + state: Arc>, room_data_provider: P, track_read_receipts: bool, } @@ -128,7 +128,7 @@ impl TimelineInner

{ items: ObservableVector::with_capacity(32), ..Default::default() }; - Self { state: Mutex::new(state), room_data_provider, track_read_receipts: false } + Self { state: Arc::new(Mutex::new(state)), room_data_provider, track_read_receipts: false } } pub(super) fn with_read_receipt_tracking(mut self, track_read_receipts: bool) -> Self { @@ -280,14 +280,15 @@ impl TimelineInner

{ Ok(result) } - pub(super) fn set_initial_user_receipt( + pub(super) async fn set_initial_user_receipt( &mut self, receipt_type: ReceiptType, receipt: (OwnedEventId, Receipt), ) { let own_user_id = self.room_data_provider.own_user_id().to_owned(); self.state - .get_mut() + .lock() + .await .users_read_receipts .entry(own_user_id) .or_default() @@ -302,7 +303,7 @@ impl TimelineInner

{ debug!("Adding {} initial events", events.len()); - let state = self.state.get_mut(); + let mut state = self.state.lock().await; for event in events { state .handle_remote_event( @@ -699,34 +700,32 @@ impl TimelineInner

{ pub(super) async fn retry_event_decryption( &self, room: &room::Common, - session_ids: Option>, + session_ids: Option>, ) { - self.retry_event_decryption_inner(room, session_ids).await + self.retry_event_decryption_inner(room.to_owned(), session_ids).await } #[cfg(all(test, feature = "e2e-encryption"))] pub(super) async fn retry_event_decryption_test( &self, room_id: &RoomId, - olm_machine: &OlmMachine, - session_ids: Option>, + olm_machine: OlmMachine, + session_ids: Option>, ) { - self.retry_event_decryption_inner((olm_machine, room_id), session_ids).await + self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await } #[cfg(feature = "e2e-encryption")] async fn retry_event_decryption_inner( &self, decryptor: impl Decryptor, - session_ids: Option>, + session_ids: Option>, ) { use super::EncryptedMessage; - trace!("Retrying decryption"); + let mut state = self.state.clone().lock_owned().await; - let push_rules_context = self.room_data_provider.push_rules_and_context().await; - - let should_retry = |session_id: &str| { + let should_retry = move |session_id: &str| { if let Some(session_ids) = &session_ids { session_ids.contains(session_id) } else { @@ -734,82 +733,112 @@ impl TimelineInner

{ } }; - let retry_one = |item: Arc| { - async move { - let event_item = item.as_event()?; + let retry_indices: Vec<_> = state + .items + .iter() + .enumerate() + .filter_map(|(idx, item)| match item.as_event()?.content().as_unable_to_decrypt()? { + EncryptedMessage::MegolmV1AesSha2 { session_id, .. } + if should_retry(session_id) => + { + Some(idx) + } + EncryptedMessage::MegolmV1AesSha2 { .. } + | EncryptedMessage::OlmV1Curve25519AesSha2 { .. } + | EncryptedMessage::Unknown => None, + }) + .collect(); - let session_id = match event_item.content().as_unable_to_decrypt()? { - EncryptedMessage::MegolmV1AesSha2 { session_id, .. } - if should_retry(session_id) => - { - session_id - } - EncryptedMessage::MegolmV1AesSha2 { .. } - | EncryptedMessage::OlmV1Curve25519AesSha2 { .. } - | EncryptedMessage::Unknown => return None, - }; + if retry_indices.is_empty() { + return; + } - tracing::Span::current().record("session_id", session_id); + debug!("Retrying decryption"); - let Some(remote_event) = event_item.as_remote() else { - error!("Key for unable-to-decrypt timeline item is not an event ID"); - return None; - }; + let track_read_receipts = self.track_read_receipts; + let room_data_provider = self.room_data_provider.clone(); + let push_rules_context = room_data_provider.push_rules_and_context().await; - tracing::Span::current().record("event_id", debug(&remote_event.event_id)); + matrix_sdk::executor::spawn(async move { + let retry_one = |item: Arc| { + let decryptor = decryptor.clone(); + let should_retry = &should_retry; + async move { + let event_item = item.as_event()?; - match decryptor.decrypt_event_impl(&remote_event.original_json).await { - Ok(event) => { - trace!("Successfully decrypted event that previously failed to decrypt"); - Some(event) - } - Err(e) => { - info!("Failed to decrypt event after receiving room key: {e}"); - None + let session_id = match event_item.content().as_unable_to_decrypt()? { + EncryptedMessage::MegolmV1AesSha2 { session_id, .. } + if should_retry(session_id) => + { + session_id + } + EncryptedMessage::MegolmV1AesSha2 { .. } + | EncryptedMessage::OlmV1Curve25519AesSha2 { .. } + | EncryptedMessage::Unknown => return None, + }; + + tracing::Span::current().record("session_id", session_id); + + let Some(remote_event) = event_item.as_remote() else { + error!("Key for unable-to-decrypt timeline item is not an event ID"); + return None; + }; + + tracing::Span::current().record("event_id", debug(&remote_event.event_id)); + + match decryptor.decrypt_event_impl(&remote_event.original_json).await { + Ok(event) => { + trace!( + "Successfully decrypted event that previously failed to decrypt" + ); + Some(event) + } + Err(e) => { + info!("Failed to decrypt event after receiving room key: {e}"); + None + } } } - } - .instrument(info_span!( - "retry_one", - session_id = field::Empty, - event_id = field::Empty - )) - }; - - let mut state = self.state.lock().await; - - // We loop through all the items in the timeline, if we successfully - // decrypt a UTD item we either replace it or remove it and update - // another one. - let mut idx = 0; - while let Some(item) = state.items.get(idx) { - let Some(mut event) = retry_one(item.clone()).await else { - idx += 1; - continue; + .instrument(info_span!( + "retry_one", + session_id = field::Empty, + event_id = field::Empty + )) }; - event.push_actions = push_rules_context - .as_ref() - .map(|(push_rules, push_context)| { - push_rules.get_actions(&event.event, push_context).to_owned() - }) - .unwrap_or_default(); + // Loop through all the indices, in order so we don't decrypt edits + // before the event being edited, if both were UTD. Keep track of + // index change as UTDs are removed instead of updated. + let mut offset = 0; + for idx in retry_indices { + let idx = idx - offset; + let Some(mut event) = retry_one(state.items[idx].clone()).await else { + continue; + }; - let result = state - .handle_remote_event( - event.into(), - TimelineItemPosition::Update(idx), - &self.room_data_provider, - self.track_read_receipts, - ) - .await; + event.push_actions = push_rules_context + .as_ref() + .map(|(push_rules, push_context)| { + push_rules.get_actions(&event.event, push_context).to_owned() + }) + .unwrap_or_default(); - // If the UTD was removed rather than updated, run the loop again - // with the same index. - if !result.item_removed { - idx += 1; + let result = state + .handle_remote_event( + event.into(), + TimelineItemPosition::Update(idx), + &room_data_provider, + track_read_receipts, + ) + .await; + + // If the UTD was removed rather than updated, offset all + // subsequent loop iterations. + if result.item_removed { + offset += 1; + } } - } + }); } pub(super) async fn set_sender_profiles_pending(&self) { diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 770014602..0ab6c6578 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -97,7 +97,7 @@ const DEFAULT_SANITIZER_MODE: HtmlSanitizerMode = HtmlSanitizerMode::Compat; /// messages. #[derive(Debug)] pub struct Timeline { - inner: Arc>, + inner: TimelineInner, start_token: Arc>>, start_token_condvar: Arc, @@ -266,14 +266,14 @@ impl Timeline { /// # anyhow::Ok(()) }; /// ``` #[cfg(feature = "e2e-encryption")] - pub async fn retry_decryption<'a, S: AsRef + 'a>( - &'a self, - session_ids: impl IntoIterator, + pub async fn retry_decryption>( + &self, + session_ids: impl IntoIterator, ) { self.inner .retry_event_decryption( self.room(), - Some(session_ids.into_iter().map(AsRef::as_ref).collect()), + Some(session_ids.into_iter().map(Into::into).collect()), ) .await; } diff --git a/crates/matrix-sdk-ui/src/timeline/queue.rs b/crates/matrix-sdk-ui/src/timeline/queue.rs index f210692ff..74fea6f92 100644 --- a/crates/matrix-sdk-ui/src/timeline/queue.rs +++ b/crates/matrix-sdk-ui/src/timeline/queue.rs @@ -44,7 +44,7 @@ pub(super) struct LocalMessage { #[instrument(skip_all, fields(room_id = ?room.room_id()))] pub(super) async fn send_queued_messages( - timeline_inner: Arc, + timeline_inner: TimelineInner, room: room::Common, mut msg_receiver: Receiver, ) { @@ -100,7 +100,7 @@ async fn handle_message( room: room::Common, send_task: &mut SendMessageTask, queue: &mut VecDeque, - timeline_inner: &Arc, + timeline_inner: &TimelineInner, ) { if queue.is_empty() && send_task.is_idle() { match Room::from(room) { @@ -129,7 +129,7 @@ async fn handle_task_ready( result: SendMessageResult, send_task: &mut SendMessageTask, queue: &mut VecDeque, - timeline_inner: &Arc, + timeline_inner: &TimelineInner, ) { match result { SendMessageResult::Success { room } => { @@ -198,7 +198,7 @@ impl SendMessageTask { matches!(self, Self::Idle) } - fn start(&mut self, room: room::Joined, timeline_inner: Arc, msg: LocalMessage) { + fn start(&mut self, room: room::Joined, timeline_inner: TimelineInner, msg: LocalMessage) { debug!("Spawning message-sending task"); let txn_id = msg.txn_id.clone(); let join_handle = spawn(async move { diff --git a/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs b/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs index da4dc44ec..ac6f06307 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs @@ -14,7 +14,7 @@ #![cfg(not(target_arch = "wasm32"))] -use std::{collections::BTreeSet, io::Cursor, iter}; +use std::{io::Cursor, iter}; use assert_matches::assert_matches; use eyeball_im::VectorDiff; @@ -101,8 +101,8 @@ async fn retry_message_decryption() { .inner .retry_event_decryption_test( room_id!("!DovneieKSTkdHKpIXy:morpheus.localhost"), - &olm_machine, - Some(iter::once(SESSION_ID).collect()), + olm_machine, + Some(iter::once(SESSION_ID.to_owned()).collect()), ) .await; @@ -192,6 +192,9 @@ async fn retry_edit_decryption() { ) .await; + let items = timeline.inner.items().await; + assert_eq!(items.len(), 3); + let mut keys = decrypt_room_key_export(Cursor::new(SESSION1_KEY), "1234").unwrap(); keys.extend(decrypt_room_key_export(Cursor::new(SESSION2_KEY), "1234").unwrap()); @@ -203,7 +206,7 @@ async fn retry_edit_decryption() { .inner .retry_event_decryption_test( room_id!("!bdsREiCPHyZAPkpXer:morpheus.localhost"), - &olm_machine, + olm_machine, None, ) .await; @@ -306,8 +309,8 @@ async fn retry_edit_and_more() { .inner .retry_event_decryption_test( room_id!("!wFnAUSQbxMcfIMgvNX:flipdot.org"), - &olm_machine, - Some(BTreeSet::from_iter([SESSION_ID])), + olm_machine, + Some(iter::once(SESSION_ID.to_owned()).collect()), ) .await; @@ -391,8 +394,8 @@ async fn retry_message_decryption_highlighted() { .inner .retry_event_decryption_test( room_id!("!rYtFvMGENJleNQVJzb:matrix.org"), - &olm_machine, - Some(iter::once(SESSION_ID).collect()), + olm_machine, + Some(iter::once(SESSION_ID.to_owned()).collect()), ) .await; diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index 673475ed8..5fa0dbc07 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -367,6 +367,7 @@ impl TestTimeline { } } +#[derive(Clone)] struct TestRoomDataProvider; #[async_trait] diff --git a/crates/matrix-sdk-ui/src/timeline/to_device.rs b/crates/matrix-sdk-ui/src/timeline/to_device.rs index b30f204cc..a374322c3 100644 --- a/crates/matrix-sdk-ui/src/timeline/to_device.rs +++ b/crates/matrix-sdk-ui/src/timeline/to_device.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{iter, sync::Arc}; +use std::iter; use matrix_sdk::{event_handler::EventHandler, Client}; use ruma::{ @@ -24,7 +24,7 @@ use tracing::{debug_span, error, trace, Instrument}; use super::inner::TimelineInner; pub(super) fn handle_room_key_event( - inner: Arc, + inner: TimelineInner, room_id: OwnedRoomId, ) -> impl EventHandler { move |event: ToDeviceRoomKeyEvent, client: Client| { @@ -40,7 +40,7 @@ pub(super) fn handle_room_key_event( } pub(super) fn handle_forwarded_room_key_event( - inner: Arc, + inner: TimelineInner, room_id: OwnedRoomId, ) -> impl EventHandler { move |event: ToDeviceForwardedRoomKeyEvent, client: Client| { @@ -57,7 +57,7 @@ pub(super) fn handle_forwarded_room_key_event( async fn retry_decryption( client: Client, - inner: Arc, + inner: TimelineInner, room_id: OwnedRoomId, event_room_id: OwnedRoomId, session_id: String, @@ -75,5 +75,5 @@ async fn retry_decryption( return; }; - inner.retry_event_decryption(&room, Some(iter::once(session_id.as_str()).collect())).await; + inner.retry_event_decryption(&room, Some(iter::once(session_id).collect())).await; } diff --git a/crates/matrix-sdk-ui/src/timeline/traits.rs b/crates/matrix-sdk-ui/src/timeline/traits.rs index 99337a0f1..24fbde7df 100644 --- a/crates/matrix-sdk-ui/src/timeline/traits.rs +++ b/crates/matrix-sdk-ui/src/timeline/traits.rs @@ -47,7 +47,7 @@ impl RoomExt for room::Common { } #[async_trait] -pub(super) trait RoomDataProvider { +pub(super) trait RoomDataProvider: Clone + Send + Sync + 'static { fn own_user_id(&self) -> &UserId; async fn profile(&self, user_id: &UserId) -> Option; async fn read_receipts_for_event(&self, event_id: &EventId) -> IndexMap; @@ -115,13 +115,13 @@ impl RoomDataProvider for room::Common { // object, which is annoying to create for testing and not really needed #[cfg(feature = "e2e-encryption")] #[async_trait] -pub(super) trait Decryptor: Copy { +pub(super) trait Decryptor: Clone + Send + Sync + 'static { async fn decrypt_event_impl(&self, raw: &Raw) -> Result; } #[cfg(feature = "e2e-encryption")] #[async_trait] -impl Decryptor for &room::Common { +impl Decryptor for room::Common { async fn decrypt_event_impl(&self, raw: &Raw) -> Result { self.decrypt_event(raw.cast_ref()).await } @@ -129,7 +129,7 @@ impl Decryptor for &room::Common { #[cfg(all(test, feature = "e2e-encryption"))] #[async_trait] -impl Decryptor for (&matrix_sdk_base::crypto::OlmMachine, &ruma::RoomId) { +impl Decryptor for (matrix_sdk_base::crypto::OlmMachine, ruma::OwnedRoomId) { async fn decrypt_event_impl(&self, raw: &Raw) -> Result { let (olm_machine, room_id) = self; let event = olm_machine.decrypt_room_event(raw.cast_ref(), room_id).await?;