diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index 0026107d5..ff89ec8d3 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -14,7 +14,7 @@ rustls-tls = ["matrix-sdk/rustls-tls"] experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"] experimental-sliding-sync = ["matrix-sdk/experimental-sliding-sync"] -testing = ["matrix-sdk/testing"] +testing = ["matrix-sdk/testing", "dep:eyeball-im-util"] [dependencies] async-once-cell = "0.5.2" diff --git a/crates/matrix-sdk-ui/src/timeline/inner.rs b/crates/matrix-sdk-ui/src/timeline/inner.rs index 5494c8fae..082ddf8b6 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner.rs @@ -18,6 +18,8 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use eyeball_im::{ObservableVector, VectorSubscriber}; +#[cfg(feature = "testing")] +use eyeball_im_util::{FilterMapVectorSubscriber, VectorExt}; use imbl::Vector; use indexmap::{IndexMap, IndexSet}; #[cfg(all(test, feature = "e2e-encryption"))] @@ -126,6 +128,20 @@ impl TimelineInner

{ (items, stream) } + #[cfg(feature = "testing")] + pub(super) async fn subscribe_filter_map( + &self, + f: F, + ) -> (Vector, FilterMapVectorSubscriber, F>) + where + U: Clone, + F: Fn(Arc) -> Option, + { + trace!("Creating timeline items signal"); + let state = self.state.lock().await; + state.items.subscribe_filter_map(f) + } + pub(super) fn set_initial_user_receipt( &mut self, receipt_type: ReceiptType, diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 768c40f9e..d1fb5f0bd 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -18,7 +18,7 @@ use std::{fs, path::Path, pin::Pin, sync::Arc, task::Poll}; -use eyeball_im::{VectorDiff, VectorSubscriber}; +use eyeball_im::VectorDiff; use futures_core::Stream; use futures_util::TryFutureExt; use imbl::Vector; @@ -244,6 +244,16 @@ impl Timeline { (items, stream) } + #[cfg(feature = "testing")] + pub async fn subscribe_filter_map( + &self, + f: impl Fn(Arc) -> Option, + ) -> (Vector, impl Stream>) { + let (items, stream) = self.inner.subscribe_filter_map(f).await; + let stream = TimelineStream::new(stream, self.drop_handle.clone()); + (items, stream) + } + /// Send a message to the room, and add it to the timeline as a local echo. /// /// For simplicity, this method doesn't currently allow custom message @@ -552,24 +562,21 @@ impl Drop for TimelineDropHandle { } pin_project! { - struct TimelineStream { + struct TimelineStream { #[pin] - inner: VectorSubscriber>, + inner: S, event_handler_handles: Arc, } } -impl TimelineStream { - fn new( - inner: VectorSubscriber>, - event_handler_handles: Arc, - ) -> Self { +impl TimelineStream { + fn new(inner: S, event_handler_handles: Arc) -> Self { Self { inner, event_handler_handles } } } -impl Stream for TimelineStream { - type Item = VectorDiff>; +impl Stream for TimelineStream { + type Item = S::Item; fn poll_next( self: Pin<&mut Self>, diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index a8b22a041..1f8f4f808 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -133,6 +133,60 @@ async fn echo() { assert_eq!(item.timestamp(), MilliSecondsSinceUnixEpoch(uint!(152038280))); } +#[async_test] +async fn retry_failed() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = Arc::new(room.timeline().await); + let (_, mut timeline_stream) = + timeline.subscribe_filter_map(|item| item.as_event().cloned()).await; + + let event_id = event_id!("$wWgymRfo7ri1uQx0NXO40vLJ"); + let txn_id: &TransactionId = "my-txn-id".into(); + + timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id)).await; + + // First, local echo is added + assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + }); + + // Sending fails, the mock server has no matching route yet + assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { + assert_matches!(value.send_state(), Some(EventSendState::SendingFailed { .. })); + }); + + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(&json!({ "event_id": event_id }))) + .mount(&server) + .await; + + timeline.retry_send(txn_id).await.unwrap(); + + // After mocking the endpoint and retrying, it first transitions back out of + // the error state + assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + }); + + // … before succeeding. + assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { + assert_matches!(value.send_state(), Some(EventSendState::Sent { .. })); + }); +} + #[async_test] async fn dedup_by_event_id_late() { let room_id = room_id!("!a98sd12bjh:example.org");