ui: Put retries in the message-sending queue

This commit is contained in:
Jonas Platte
2023-06-23 10:05:36 +02:00
committed by Jonas Platte
parent 0a37ce50a8
commit 9b802998b3
2 changed files with 107 additions and 19 deletions

View File

@@ -386,24 +386,10 @@ impl Timeline {
}
};
let send_state = match Room::from(self.room().clone()) {
Room::Joined(room) => {
let response = room.send(content, Some(txn_id)).await;
match response {
Ok(response) => EventSendState::Sent { event_id: response.event_id },
Err(error) => EventSendState::SendingFailed { error: Arc::new(error) },
}
}
_ => {
EventSendState::SendingFailed {
// FIXME: Probably not exactly right
error: Arc::new(matrix_sdk::Error::InconsistentState),
}
}
};
self.inner.update_event_send_state(txn_id, send_state).await;
let txn_id = txn_id.to_owned();
if self.msg_sender.send(LocalMessage { content, txn_id }).await.is_err() {
error!("Internal error: timeline message receiver is closed");
}
Ok(())
}

View File

@@ -14,10 +14,12 @@
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use eyeball_im::VectorDiff;
use futures_util::StreamExt;
use matrix_sdk::config::SyncSettings;
use matrix_sdk_test::{async_test, EventBuilder, JoinedRoomBuilder};
use matrix_sdk_ui::timeline::RoomExt;
use matrix_sdk_ui::timeline::{EventSendState, RoomExt};
use ruma::{events::room::message::RoomMessageEventContent, room_id};
use serde_json::json;
use stream_assert::{assert_next_matches, assert_pending};
@@ -100,3 +102,103 @@ async fn message_order() {
});
assert_pending!(timeline_stream);
}
#[async_test]
async fn retry_order() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
mock_encryption_state(&server, false).await;
let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline().await);
let (_, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
// Send two messages without mocking the server response.
// It will respond with a 404, resulting in a failed-to-send state.
timeline.send(RoomMessageEventContent::text_plain("First!").into(), Some("1".into())).await;
timeline.send(RoomMessageEventContent::text_plain("Second.").into(), Some("2".into())).await;
// Local echoes are available as soon as `timeline.send` returns
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_eq!(value.content().as_message().unwrap().body(), "First!");
});
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
});
// Local echoes are updated with the failed send state as soon as
// the 404 response is received
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 0, value }) => {
assert_matches!(value.send_state().unwrap(), EventSendState::SendingFailed { .. });
});
// The second one is cancelled without an extra delay
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => {
assert_matches!(value.send_state().unwrap(), EventSendState::Cancelled);
});
// Response for first message takes 100ms to respond
Mock::given(method("PUT"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
.and(body_string_contains("First!"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(&json!({ "event_id": "$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP" }))
.set_delay(Duration::from_millis(100)),
)
.mount(&server)
.await;
// Response for second message takes 200ms to respond, so should come back
// after first if we don't serialize retries
Mock::given(method("PUT"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
.and(body_string_contains("Second."))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(&json!({ "event_id": "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ" }))
.set_delay(Duration::from_millis(200)),
)
.mount(&server)
.await;
// Retry the second message first
timeline.retry_send("2".into()).await.unwrap();
timeline.retry_send("1".into()).await.unwrap();
// Both items are immediately updated to indicate they are being sent
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => {
assert_matches!(value.send_state().unwrap(), EventSendState::NotSentYet);
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
});
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_matches!(value.send_state().unwrap(), EventSendState::NotSentYet);
assert_eq!(value.content().as_message().unwrap().body(), "First!");
});
// Wait 200ms for the first msg, 100ms for the second, 100ms for overhead
sleep(Duration::from_millis(400)).await;
// The second item should be updated first, since it was retried first
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => {
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. });
assert_eq!(value.event_id().unwrap(), "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ");
});
// Then the first one
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_eq!(value.content().as_message().unwrap().body(), "First!");
assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. });
assert_eq!(value.event_id().unwrap(), "$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP");
});
assert_pending!(timeline_stream);
}