timeline queue: tiny refactorings

A few renamings here and there, making use of `as_variant!` a bit more,
adding a few comments,…
This commit is contained in:
Benjamin Bouvier
2024-05-17 18:00:46 +02:00
parent 8867a03c07
commit e49d62988b
4 changed files with 98 additions and 76 deletions

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use as_variant::as_variant;
use indexmap::IndexMap;
use matrix_sdk::{deserialized_responses::EncryptionInfo, Client, Error};
use matrix_sdk_base::{deserialized_responses::SyncTimelineEvent, latest_event::LatestEvent};
@@ -180,44 +181,31 @@ impl EventTimelineItem {
/// Get the `LocalEventTimelineItem` if `self` is `Local`.
pub(super) fn as_local(&self) -> Option<&LocalEventTimelineItem> {
match &self.kind {
EventTimelineItemKind::Local(local_event_item) => Some(local_event_item),
EventTimelineItemKind::Remote(_) => None,
}
as_variant!(&self.kind, EventTimelineItemKind::Local(local_event_item) => local_event_item)
}
/// Get the `RemoteEventTimelineItem` if `self` is `Remote`.
/// Get a reference to a [`RemoteEventTimelineItem`] if it's a remote echo.
pub(super) fn as_remote(&self) -> Option<&RemoteEventTimelineItem> {
match &self.kind {
EventTimelineItemKind::Local(_) => None,
EventTimelineItemKind::Remote(remote_event_item) => Some(remote_event_item),
}
as_variant!(&self.kind, EventTimelineItemKind::Remote(remote_event_item) => remote_event_item)
}
/// Get a mutable reference to a [`RemoteEventTimelineItem`] if it's a
/// remote echo.
pub(super) fn as_remote_mut(&mut self) -> Option<&mut RemoteEventTimelineItem> {
match &mut self.kind {
EventTimelineItemKind::Local(_) => None,
EventTimelineItemKind::Remote(remote_event_item) => Some(remote_event_item),
}
as_variant!(&mut self.kind, EventTimelineItemKind::Remote(remote_event_item) => remote_event_item)
}
/// Get the event's send state, if it is a local echo.
/// Get the event's send state of a local echo.
pub fn send_state(&self) -> Option<&EventSendState> {
match &self.kind {
EventTimelineItemKind::Local(local) => Some(&local.send_state),
EventTimelineItemKind::Remote(_) => None,
}
as_variant!(&self.kind, EventTimelineItemKind::Local(local) => &local.send_state)
}
/// Get the transaction ID of this item.
/// Get the transaction ID of a local echo item.
///
/// The transaction ID is currently only kept until the remote echo for a
/// local event is received.
pub fn transaction_id(&self) -> Option<&TransactionId> {
match &self.kind {
EventTimelineItemKind::Local(local) => Some(&local.transaction_id),
EventTimelineItemKind::Remote(_) => None,
}
as_variant!(&self.kind, EventTimelineItemKind::Local(local) => &local.transaction_id)
}
/// Get the event ID of this item.

View File

@@ -645,7 +645,8 @@ impl<P: RoomDataProvider> TimelineInner<P> {
/// Update the send state of a local event represented by a transaction ID.
///
/// If no local event is found, a warning is raised.
/// If the corresponding local timeline item is missing, a warning is
/// raised.
#[instrument(skip_all, fields(txn_id))]
pub(super) async fn update_event_send_state(
&self,
@@ -718,9 +719,11 @@ impl<P: RoomDataProvider> TimelineInner<P> {
txn.items.set(idx, new_item);
if is_error {
// When there is an error, sending further messages is paused. This
// should be reflected in the timeline, so we set all other pending
// events to cancelled.
// When there is an error, sending further messages is paused. This should be
// reflected in the timeline, so we set all other pending events to
// cancelled.
//
// TODO(bnjbvr): spooky action at a distance here^
let items = &mut txn.items;
let num_items = items.len();
for idx in 0..num_items {
@@ -797,6 +800,10 @@ impl<P: RoomDataProvider> TimelineInner<P> {
Ok(follow_up_action)
}
/// Attempts to find a local echo item for some event to be sent.
///
/// If it's found, it's moved back to the end of the timeline, then its
/// content is returned by this function.
pub(super) async fn prepare_retry(
&self,
txn_id: &TransactionId,

View File

@@ -571,33 +571,32 @@ impl Timeline {
}
let item = self.inner.prepare_retry(txn_id).await.ok_or(Error::RetryEventNotInTimeline)?;
let content = match item {
TimelineItemContent::Message(msg) => {
AnyMessageLikeEventContent::RoomMessage(msg.into())
}
TimelineItemContent::RedactedMessage => {
error_return!("Invalid state: attempting to retry a redacted message");
}
TimelineItemContent::Sticker(sticker) => {
AnyMessageLikeEventContent::Sticker(sticker.content)
}
TimelineItemContent::UnableToDecrypt(_) => {
error_return!("Invalid state: attempting to retry a UTD item");
}
TimelineItemContent::Poll(poll_state) => AnyMessageLikeEventContent::UnstablePollStart(
UnstablePollStartEventContent::New(poll_state.into()),
),
TimelineItemContent::MembershipChange(_)
| TimelineItemContent::ProfileChange(_)
| TimelineItemContent::OtherState(_) => {
error_return!("Retrying state events is not currently supported");
| TimelineItemContent::OtherState(_)
| TimelineItemContent::CallInvite => {
error_return!("Retrying state events/call invite is not currently supported");
}
TimelineItemContent::FailedToParseMessageLike { .. }
| TimelineItemContent::FailedToParseState { .. } => {
error_return!("Invalid state: attempting to retry a failed-to-parse item");
}
TimelineItemContent::Poll(poll_state) => AnyMessageLikeEventContent::UnstablePollStart(
UnstablePollStartEventContent::New(poll_state.into()),
),
TimelineItemContent::CallInvite => {
error_return!("Retrying call events is not currently supported");
TimelineItemContent::RedactedMessage => {
error_return!("Invalid state: attempting to retry a redacted message");
}
TimelineItemContent::UnableToDecrypt(_) => {
error_return!("Invalid state: attempting to retry a UTD item");
}
};

View File

@@ -39,13 +39,14 @@ pub(super) struct LocalMessage {
///
/// Used for finding the corresponding local echo in the timeline.
pub txn_id: OwnedTransactionId,
/// The message contents.
pub content: AnyMessageLikeEventContent,
}
#[instrument(skip_all, fields(room_id = ?room.room_id()))]
pub(super) async fn send_queued_messages(
timeline_inner: TimelineInner,
timeline: TimelineInner,
room: Room,
mut msg_receiver: Receiver<LocalMessage>,
) {
@@ -58,23 +59,27 @@ pub(super) async fn send_queued_messages(
select! {
result = &mut send_task => {
trace!("SendMessageTask finished");
send_task.reset();
handle_task_ready(
handle_send_result(
result,
&mut send_task,
&mut queue,
&timeline_inner,
&timeline,
).await;
}
recv_res = &mut recv_fut => {
recv_fut = if let Some(msg) = recv_res {
trace!("Got a LocalMessage");
handle_message(
send_or_queue_msg(
msg,
room.clone(),
&mut send_task,
&mut queue,
&timeline_inner,
&timeline,
).await;
// appease the borrow checker
@@ -96,59 +101,71 @@ pub(super) async fn send_queued_messages(
info!("Stopped");
}
async fn handle_message(
async fn send_or_queue_msg(
msg: LocalMessage,
room: Room,
send_task: &mut SendMessageTask,
queue: &mut VecDeque<LocalMessage>,
timeline_inner: &TimelineInner,
timeline: &TimelineInner,
) {
if queue.is_empty() && send_task.is_idle() {
if room.state() == RoomState::Joined {
send_task.start(room, timeline_inner.clone(), msg);
} else {
info!("Refusing to send message, room is not joined");
timeline_inner
.update_event_send_state(
&msg.txn_id,
EventSendState::SendingFailed {
// FIXME: Probably not exactly right
error: Arc::new(matrix_sdk::Error::InconsistentState),
},
)
.await;
}
} else {
// Only send the message immediately if there aren't other messages to be sent
// first, and we're not currently sending a message.
if !queue.is_empty() || !send_task.is_idle() {
queue.push_back(msg);
return;
}
if room.state() != RoomState::Joined {
info!("Refusing to send message, room is not joined");
timeline
.update_event_send_state(
&msg.txn_id,
EventSendState::SendingFailed {
// FIXME: Probably not exactly right
error: Arc::new(matrix_sdk::Error::InconsistentState),
},
)
.await;
return;
}
send_task.start(room, timeline.clone(), msg);
}
async fn handle_task_ready(
result: SendMessageResult,
async fn handle_send_result(
send_result: SendMessageResult,
send_task: &mut SendMessageTask,
queue: &mut VecDeque<LocalMessage>,
timeline_inner: &TimelineInner,
timeline: &TimelineInner,
) {
match result {
match send_result {
SendMessageResult::Success { room } => {
// Event was successfully sent, move on to the next queued event.
if let Some(msg) = queue.pop_front() {
send_task.start(room, timeline_inner.clone(), msg);
send_task.start(room, timeline.clone(), msg);
}
}
SendMessageResult::SendingFailed => {
// Timeline items are marked as failed / cancelled in this case.
// Clear the timeline and wait for the user to explicitly retry.
//
// Clear the queue and wait for the user to explicitly retry (which will
// re-append the to-be-sent events in the queue).
queue.clear();
}
SendMessageResult::TaskError { join_error, txn_id } => {
error!("Message-sending task failed: {join_error}");
// See above comment in the `SendingFailed` arm.
queue.clear();
let send_state = EventSendState::SendingFailed {
// FIXME: Probably not exactly right
error: Arc::new(matrix_sdk::Error::InconsistentState),
};
timeline_inner.update_event_send_state(&txn_id, send_state).await;
timeline.update_event_send_state(&txn_id, send_state).await;
}
}
}
@@ -162,8 +179,10 @@ enum SendMessageResult {
/// in the queue, if it isn't empty.
room: Room,
},
/// Sending failed, and the local echo was updated to indicate this.
SendingFailed,
/// The [`SendMessageTask`] failed, likely due to a panic.
///
/// This means that the timeline item was likely not updated yet, which thus
@@ -181,12 +200,13 @@ enum SendMessageResult {
enum SendMessageTask {
/// No background task is active right now.
Idle,
/// A background task has been spawned, we're waiting for its result.
Running {
/// The transaction ID of the message that is being sent.
txn_id: OwnedTransactionId,
/// Handle to the task itself.
join_handle: JoinHandle<Option<Room>>,
task: JoinHandle<Option<Room>>,
},
}
@@ -196,20 +216,27 @@ impl SendMessageTask {
matches!(self, Self::Idle)
}
fn start(&mut self, room: Room, timeline_inner: TimelineInner, msg: LocalMessage) {
/// Spawns a task sending the message to the room, and updating the timeline
/// once the result has been processed.
fn start(&mut self, room: Room, timeline: TimelineInner, msg: LocalMessage) {
debug!("Spawning message-sending task");
let txn_id = msg.txn_id.clone();
let join_handle = spawn(async move {
let task = spawn(async move {
let result = room.send(msg.content).with_transaction_id(&msg.txn_id).await;
let (room, send_state) = match result {
Ok(response) => (Some(room), EventSendState::Sent { event_id: response.event_id }),
Err(error) => (None, EventSendState::SendingFailed { error: Arc::new(error) }),
};
timeline_inner.update_event_send_state(&msg.txn_id, send_state).await;
timeline.update_event_send_state(&msg.txn_id, send_state).await;
room
});
*self = Self::Running { txn_id, join_handle };
*self = Self::Running { txn_id, task };
}
fn reset(&mut self) {
@@ -223,7 +250,8 @@ impl Future for SendMessageTask {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
SendMessageTask::Idle => Poll::Pending,
SendMessageTask::Running { txn_id, join_handle } => {
SendMessageTask::Running { txn_id, task: join_handle } => {
Pin::new(join_handle).poll(cx).map(|result| {
let txn_id = mem::replace(txn_id, OwnedTransactionId::from(""));
if txn_id.as_str().is_empty() {