ui: Add a test for retrying a failed send

This commit is contained in:
Jonas Platte
2023-06-07 13:03:53 +02:00
committed by Jonas Platte
parent 0ca8369a76
commit 415c13fa90
4 changed files with 88 additions and 11 deletions

View File

@@ -14,7 +14,7 @@ rustls-tls = ["matrix-sdk/rustls-tls"]
experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"]
experimental-sliding-sync = ["matrix-sdk/experimental-sliding-sync"]
testing = ["matrix-sdk/testing"]
testing = ["matrix-sdk/testing", "dep:eyeball-im-util"]
[dependencies]
async-once-cell = "0.5.2"

View File

@@ -18,6 +18,8 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use eyeball_im::{ObservableVector, VectorSubscriber};
#[cfg(feature = "testing")]
use eyeball_im_util::{FilterMapVectorSubscriber, VectorExt};
use imbl::Vector;
use indexmap::{IndexMap, IndexSet};
#[cfg(all(test, feature = "e2e-encryption"))]
@@ -126,6 +128,20 @@ impl<P: RoomDataProvider> TimelineInner<P> {
(items, stream)
}
#[cfg(feature = "testing")]
pub(super) async fn subscribe_filter_map<U, F>(
&self,
f: F,
) -> (Vector<U>, FilterMapVectorSubscriber<Arc<TimelineItem>, F>)
where
U: Clone,
F: Fn(Arc<TimelineItem>) -> Option<U>,
{
trace!("Creating timeline items signal");
let state = self.state.lock().await;
state.items.subscribe_filter_map(f)
}
pub(super) fn set_initial_user_receipt(
&mut self,
receipt_type: ReceiptType,

View File

@@ -18,7 +18,7 @@
use std::{fs, path::Path, pin::Pin, sync::Arc, task::Poll};
use eyeball_im::{VectorDiff, VectorSubscriber};
use eyeball_im::VectorDiff;
use futures_core::Stream;
use futures_util::TryFutureExt;
use imbl::Vector;
@@ -244,6 +244,16 @@ impl Timeline {
(items, stream)
}
#[cfg(feature = "testing")]
pub async fn subscribe_filter_map<U: Clone>(
&self,
f: impl Fn(Arc<TimelineItem>) -> Option<U>,
) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>) {
let (items, stream) = self.inner.subscribe_filter_map(f).await;
let stream = TimelineStream::new(stream, self.drop_handle.clone());
(items, stream)
}
/// Send a message to the room, and add it to the timeline as a local echo.
///
/// For simplicity, this method doesn't currently allow custom message
@@ -552,24 +562,21 @@ impl Drop for TimelineDropHandle {
}
pin_project! {
struct TimelineStream {
struct TimelineStream<S> {
#[pin]
inner: VectorSubscriber<Arc<TimelineItem>>,
inner: S,
event_handler_handles: Arc<TimelineDropHandle>,
}
}
impl TimelineStream {
fn new(
inner: VectorSubscriber<Arc<TimelineItem>>,
event_handler_handles: Arc<TimelineDropHandle>,
) -> Self {
impl<S> TimelineStream<S> {
fn new(inner: S, event_handler_handles: Arc<TimelineDropHandle>) -> Self {
Self { inner, event_handler_handles }
}
}
impl Stream for TimelineStream {
type Item = VectorDiff<Arc<TimelineItem>>;
impl<S: Stream> Stream for TimelineStream<S> {
type Item = S::Item;
fn poll_next(
self: Pin<&mut Self>,

View File

@@ -133,6 +133,60 @@ async fn echo() {
assert_eq!(item.timestamp(), MilliSecondsSinceUnixEpoch(uint!(152038280)));
}
#[async_test]
async fn retry_failed() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline().await);
let (_, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
let event_id = event_id!("$wWgymRfo7ri1uQx0NXO40vLJ");
let txn_id: &TransactionId = "my-txn-id".into();
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id)).await;
// First, local echo is added
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_matches!(value.send_state(), Some(EventSendState::NotSentYet));
});
// Sending fails, the mock server has no matching route yet
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_matches!(value.send_state(), Some(EventSendState::SendingFailed { .. }));
});
Mock::given(method("PUT"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(&json!({ "event_id": event_id })))
.mount(&server)
.await;
timeline.retry_send(txn_id).await.unwrap();
// After mocking the endpoint and retrying, it first transitions back out of
// the error state
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_matches!(value.send_state(), Some(EventSendState::NotSentYet));
});
// … before succeeding.
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_matches!(value.send_state(), Some(EventSendState::Sent { .. }));
});
}
#[async_test]
async fn dedup_by_event_id_late() {
let room_id = room_id!("!a98sd12bjh:example.org");