diff --git a/crates/matrix-sdk/src/latest_events/latest_event.rs b/crates/matrix-sdk/src/latest_events/latest_event.rs index c6a1445d1..09bbaf0d9 100644 --- a/crates/matrix-sdk/src/latest_events/latest_event.rs +++ b/crates/matrix-sdk/src/latest_events/latest_event.rs @@ -46,6 +46,11 @@ pub(super) struct LatestEvent { /// The thread (if any) owning this latest event. _thread_id: Option, + /// A buffer of the current [`LatestEventValue`] computed for local events + /// seen by the send queue. See [`LatestEventValuesForLocalEvents`] to learn + /// more. + buffer_of_values_for_local_events: LatestEventValuesForLocalEvents, + /// The latest event value. current_value: SharedObservable, } @@ -60,6 +65,7 @@ impl LatestEvent { Self { _room_id: room_id.to_owned(), _thread_id: thread_id.map(ToOwned::to_owned), + buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(), current_value: SharedObservable::new_async( LatestEventValue::new_remote(room_event_cache, weak_room).await, ), @@ -86,8 +92,21 @@ impl LatestEvent { /// Update the inner latest event value, based on the send queue /// (specifically with a [`RoomSendQueueUpdate`]). - pub async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) { - todo!() + pub async fn update_with_send_queue( + &mut self, + send_queue_update: &RoomSendQueueUpdate, + room_event_cache: &RoomEventCache, + power_levels: &Option<(&UserId, RoomPowerLevels)>, + ) { + let new_value = LatestEventValue::new_local( + send_queue_update, + &mut self.buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await; + + self.update(new_value).await; } /// Update [`Self::current_value`] if and only if the `new_value` is not @@ -150,6 +169,332 @@ impl LatestEventValue { .map(Self::Remote) .unwrap_or_default() } + + /// Create a new [`LatestEventValue::LocalIsSending`] or + /// [`LatestEventValue::LocalIsWedged`]. + async fn new_local( + send_queue_update: &RoomSendQueueUpdate, + buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents, + room_event_cache: &RoomEventCache, + power_levels: &Option<(&UserId, RoomPowerLevels)>, + ) -> Self { + use crate::send_queue::{LocalEcho, LocalEchoContent}; + + match send_queue_update { + // A new local event is being sent. + // + // Let's create the `LatestEventValue` and push it in the buffer of values. + RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id, + content: local_echo_content, + }) => match local_echo_content { + LocalEchoContent::Event { serialized_event: content, .. } => { + if let Ok(content) = content.deserialize() { + if let Some(kind) = find_and_map_any_message_like_event_content(content) { + let value = Self::LocalIsSending(kind); + + buffer_of_values_for_local_events + .push(transaction_id.to_owned(), value.clone()); + + value + } else { + Self::None + } + } else { + Self::None + } + } + + LocalEchoContent::React { .. } => Self::None, + }, + + // A local event has been cancelled before being sent. + // + // Remove the calculated `LatestEventValue` from the buffer of values, and return the + // last `LatestEventValue` or calculate a new one. + RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => { + if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) { + buffer_of_values_for_local_events.remove(position); + } + + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } + + // A local event has successfully been sent! + // + // Unwedge all wedged values after the one matching `transaction_id`. Indeed, if + // an event has been sent, it means the send queue is working, so if any value has been + // marked as wedged, it must be marked as unwedged. Then, remove the calculated + // `LatestEventValue` from the buffer of values. Finally, return the last + // `LatestEventValue` or calculate a new one. + RoomSendQueueUpdate::SentEvent { transaction_id, .. } => { + let position = buffer_of_values_for_local_events.unwedged_after(transaction_id); + + if let Some(position) = position { + buffer_of_values_for_local_events.remove(position); + } + + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } + + // A local event has been replaced by another one. + // + // Replace the latest event value matching `transaction_id` in the buffer if it exists + // (note: it should!), and return the last `LatestEventValue` or calculate a new one. + RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content: content } => { + if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) { + if let Ok(content) = content.deserialize() { + if let Some(kind) = find_and_map_any_message_like_event_content(content) { + buffer_of_values_for_local_events.replace_kind(position, kind); + } + } else { + return Self::None; + } + } + + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } + + // An error has occurred. + // + // Mark the latest event value matching `transaction_id`, and all its following values, + // as wedged. + RoomSendQueueUpdate::SendError { transaction_id, .. } => { + buffer_of_values_for_local_events.wedged_from(transaction_id); + + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } + + // A local event has been unwedged and sending is being retried. + // + // Mark the latest event value matching `transaction_id`, and all its following values, + // as unwedged. + RoomSendQueueUpdate::RetryEvent { transaction_id } => { + buffer_of_values_for_local_events.unwedged_from(transaction_id); + + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } + + // A media upload has made progress. + // + // Nothing to do here. + RoomSendQueueUpdate::MediaUpload { .. } => Self::None, + } + } + + /// Get the last [`LatestEventValue`] from the local latest event values if + /// any, or create a new [`LatestEventValue`] from the remote events. + /// + /// If the buffer of latest event values is not empty, let's return the last + /// one. Otherwise, it means we no longer have any local event: let's + /// fallback on remote event! + async fn new_local_or_remote( + buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents, + room_event_cache: &RoomEventCache, + power_levels: &Option<(&UserId, RoomPowerLevels)>, + ) -> Self { + if let Some(value) = buffer_of_values_for_local_events.last() { + value.clone() + } else { + Self::new_remote_with_power_levels(room_event_cache, power_levels).await + } + } +} + +/// A buffer of the current [`LatestEventValue`] computed for local events +/// seen by the send queue. It is used by +/// [`LatestEvent::buffer_of_values_for_local_events`]. +/// +/// The system does only receive [`RoomSendQueueUpdate`]s. It's not designed to +/// iterate over local events in the send queue when a local event is changed +/// (cancelled, or updated for example). That's why we keep our own buffer here. +/// Imagine the system receives 4 [`RoomSendQueueUpdate`]: +/// +/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event, +/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event, +/// 3. [`RoomSendQueueUpdate::ReplacedLocalEvent`]: replaced the first local +/// event, +/// 4. [`RoomSendQueueUpdate::CancelledLocalEvent`]: cancelled the second local +/// event. +/// +/// `NewLocalEvent`s will trigger the computation of new +/// `LatestEventValue`s, but `CancelledLocalEvent` for example doesn't hold +/// any information to compute a new `LatestEventValue`, so we need to +/// remember the previous values, until the local events are sent and +/// removed from this buffer. +/// +/// Another reason why we need a buffer is to handle wedged local event. Imagine +/// the system receives 3 [`RoomSendQueueUpdate`]: +/// +/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event, +/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event, +/// 3. [`RoomSendQueueUpdate::SendError`]: the first local event has failed to +/// be sent. +/// +/// Because a `SendError` is received (targeting the first `NewLocalEvent`), the +/// send queue is stopped. However, the `LatestEventValue` targets the second +/// `NewLocalEvent`. The system must consider that when a local event is wedged, +/// all the following local events must also be marked as wedged. And vice +/// versa, when the send queue is able to send an event again, all the following +/// local events must be marked as unwedged. +/// +/// This type isolates a couple of methods designed to manage these specific +/// behaviours. +#[derive(Debug)] +struct LatestEventValuesForLocalEvents { + buffer: Vec<(OwnedTransactionId, LatestEventValue)>, +} + +impl LatestEventValuesForLocalEvents { + /// Create a new [`LatestEventValuesForLocalEvents`]. + fn new() -> Self { + Self { buffer: Vec::with_capacity(2) } + } + + /// Get the last [`LatestEventValue`]. + fn last(&self) -> Option<&LatestEventValue> { + self.buffer.last().map(|(_, value)| value) + } + + /// Find the position of the [`LatestEventValue`] matching `transaction_id`. + fn position(&self, transaction_id: &TransactionId) -> Option { + self.buffer + .iter() + .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate) + } + + /// Push a new [`LatestEventValue`]. + /// + /// # Panics + /// + /// Panics if `value` is not of kind [`LatestEventValue::LocalIsSending`] or + /// [`LatestEventValue::LocalIsWedged`]. + fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) { + assert!( + matches!( + value, + LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalIsWedged(_) + ), + "`value` must be either `LocalIsSending` or `LocalIsWedged`" + ); + + self.buffer.push((transaction_id, value)); + } + + /// Replace the [`LatestEventKind`] of the [`LatestEventValue`] at position + /// `position`. + /// + /// # Panics + /// + /// Panics if: + /// - `position` is strictly greater than buffer's length, + /// - the [`LatestEventValue`] is not of kind + /// [`LatestEventValue::LocalIsSending`] or + /// [`LatestEventValue::LocalIsWedged`]. + fn replace_kind(&mut self, position: usize, new_kind: LatestEventKind) { + let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid"); + + match value { + LatestEventValue::LocalIsSending(kind) => *kind = new_kind, + LatestEventValue::LocalIsWedged(kind) => *kind = new_kind, + _ => panic!("`value` must be either `LocalIsSending` or `LocalIsWedged`"), + } + } + + /// Remove the [`LatestEventValue`] at position `position`. + /// + /// # Panics + /// + /// Panics if `position` is strictly greater than buffer's length. + fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) { + self.buffer.remove(position) + } + + /// Mark the `LatestEventValue` matching `transaction_id`, and all the + /// following values, as wedged. + fn wedged_from(&mut self, transaction_id: &TransactionId) { + let mut values = self.buffer.iter_mut(); + + if let Some(first_value_to_wedge) = values + .by_ref() + .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate) + { + // Iterate over the found value and the following ones. + for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) { + if let LatestEventValue::LocalIsSending(kind) = value_to_wedge { + *value_to_wedge = LatestEventValue::LocalIsWedged(kind.clone()); + } + } + } + } + + /// Mark the `LatestEventValue` matching `transaction_id`, and all the + /// following values, as unwedged. + fn unwedged_from(&mut self, transaction_id: &TransactionId) { + let mut values = self.buffer.iter_mut(); + + if let Some(first_value_to_unwedge) = values + .by_ref() + .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate) + { + // Iterate over the found value and the following ones. + for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) { + if let LatestEventValue::LocalIsWedged(kind) = value_to_unwedge { + *value_to_unwedge = LatestEventValue::LocalIsSending(kind.clone()); + } + } + } + } + + /// Mark all the following values after the `LatestEventValue` matching + /// `transaction_id` as unwedged. + /// + /// Note that contrary to [`Self::unwedged_from`], the `LatestEventValue` is + /// untouched. However, its position is returned (if any). + fn unwedged_after(&mut self, transaction_id: &TransactionId) -> Option { + let mut values = self.buffer.iter_mut(); + + if let Some(position) = values + .by_ref() + .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate) + { + // Iterate over all values after the found one. + for (_, value_to_unwedge) in values { + if let LatestEventValue::LocalIsWedged(kind) = value_to_unwedge { + *value_to_unwedge = LatestEventValue::LocalIsSending(kind.clone()); + } + } + + Some(position) + } else { + None + } + } } /// A latest event value! diff --git a/crates/matrix-sdk/src/latest_events/mod.rs b/crates/matrix-sdk/src/latest_events/mod.rs index 059bf23ec..85a6998ba 100644 --- a/crates/matrix-sdk/src/latest_events/mod.rs +++ b/crates/matrix-sdk/src/latest_events/mod.rs @@ -539,10 +539,30 @@ impl RoomLatestEvents { /// Update the latest events for the room and its threads, based on the /// send queue update. async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) { - self.for_the_room.update_with_send_queue(send_queue_update).await; + // Get the power levels of the user for the current room if the `WeakRoom` is + // still valid. + // + // Get it once for all the updates of all the latest events for this room (be + // the room and its threads). + let room = self.weak_room.get(); + let power_levels = match &room { + Some(room) => { + let power_levels = room.power_levels().await.ok(); + + Some(room.own_user_id()).zip(power_levels) + } + + None => None, + }; + + self.for_the_room + .update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels) + .await; for latest_event in self.per_thread.values_mut() { - latest_event.update_with_send_queue(send_queue_update).await; + latest_event + .update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels) + .await; } } }