From 9b802998b3247b83d87380b03e60bc80f1f0aec0 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 23 Jun 2023 10:05:36 +0200 Subject: [PATCH] ui: Put retries in the message-sending queue --- crates/matrix-sdk-ui/src/timeline/mod.rs | 22 +--- .../tests/integration/timeline/queue.rs | 104 +++++++++++++++++- 2 files changed, 107 insertions(+), 19 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 9900b5d91..507a4206a 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -386,24 +386,10 @@ impl Timeline { } }; - let send_state = match Room::from(self.room().clone()) { - Room::Joined(room) => { - let response = room.send(content, Some(txn_id)).await; - - match response { - Ok(response) => EventSendState::Sent { event_id: response.event_id }, - Err(error) => EventSendState::SendingFailed { error: Arc::new(error) }, - } - } - _ => { - EventSendState::SendingFailed { - // FIXME: Probably not exactly right - error: Arc::new(matrix_sdk::Error::InconsistentState), - } - } - }; - - self.inner.update_event_send_state(txn_id, send_state).await; + let txn_id = txn_id.to_owned(); + if self.msg_sender.send(LocalMessage { content, txn_id }).await.is_err() { + error!("Internal error: timeline message receiver is closed"); + } Ok(()) } diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index ad6d27da4..969857c67 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -14,10 +14,12 @@ use std::{sync::Arc, time::Duration}; +use assert_matches::assert_matches; use eyeball_im::VectorDiff; +use futures_util::StreamExt; use matrix_sdk::config::SyncSettings; use matrix_sdk_test::{async_test, EventBuilder, JoinedRoomBuilder}; -use matrix_sdk_ui::timeline::RoomExt; +use matrix_sdk_ui::timeline::{EventSendState, RoomExt}; use ruma::{events::room::message::RoomMessageEventContent, room_id}; use serde_json::json; use stream_assert::{assert_next_matches, assert_pending}; @@ -100,3 +102,103 @@ async fn message_order() { }); assert_pending!(timeline_stream); } + +#[async_test] +async fn retry_order() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + mock_encryption_state(&server, false).await; + + let room = client.get_room(room_id).unwrap(); + let timeline = Arc::new(room.timeline().await); + let (_, mut timeline_stream) = + timeline.subscribe_filter_map(|item| item.as_event().cloned()).await; + + // Send two messages without mocking the server response. + // It will respond with a 404, resulting in a failed-to-send state. + timeline.send(RoomMessageEventContent::text_plain("First!").into(), Some("1".into())).await; + timeline.send(RoomMessageEventContent::text_plain("Second.").into(), Some("2".into())).await; + + // Local echoes are available as soon as `timeline.send` returns + assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { + assert_eq!(value.content().as_message().unwrap().body(), "First!"); + }); + assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { + assert_eq!(value.content().as_message().unwrap().body(), "Second."); + }); + + // Local echoes are updated with the failed send state as soon as + // the 404 response is received + assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 0, value }) => { + assert_matches!(value.send_state().unwrap(), EventSendState::SendingFailed { .. }); + }); + // The second one is cancelled without an extra delay + assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => { + assert_matches!(value.send_state().unwrap(), EventSendState::Cancelled); + }); + + // Response for first message takes 100ms to respond + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) + .and(body_string_contains("First!")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(&json!({ "event_id": "$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP" })) + .set_delay(Duration::from_millis(100)), + ) + .mount(&server) + .await; + + // Response for second message takes 200ms to respond, so should come back + // after first if we don't serialize retries + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) + .and(body_string_contains("Second.")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(&json!({ "event_id": "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ" })) + .set_delay(Duration::from_millis(200)), + ) + .mount(&server) + .await; + + // Retry the second message first + timeline.retry_send("2".into()).await.unwrap(); + timeline.retry_send("1".into()).await.unwrap(); + + // Both items are immediately updated to indicate they are being sent + assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => { + assert_matches!(value.send_state().unwrap(), EventSendState::NotSentYet); + assert_eq!(value.content().as_message().unwrap().body(), "Second."); + }); + assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { + assert_matches!(value.send_state().unwrap(), EventSendState::NotSentYet); + assert_eq!(value.content().as_message().unwrap().body(), "First!"); + }); + + // Wait 200ms for the first msg, 100ms for the second, 100ms for overhead + sleep(Duration::from_millis(400)).await; + + // The second item should be updated first, since it was retried first + assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => { + assert_eq!(value.content().as_message().unwrap().body(), "Second."); + assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. }); + assert_eq!(value.event_id().unwrap(), "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ"); + }); + // Then the first one + assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { + assert_eq!(value.content().as_message().unwrap().body(), "First!"); + assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. }); + assert_eq!(value.event_id().unwrap(), "$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP"); + }); + assert_pending!(timeline_stream); +}