feat(sdk): Implement LatestEvent::update_with_send_queue.

This patch implements `LatestEvent::update_with_send_queue`.
It introduces an intermediate type, for the sake of clarity,
`LatestEventValuesForLocalEvents`.

The difficulty here is to keep a buffer of `LatestEventValue`s requested
by the `SendQueue`. Why? Because we want the latest event value, but we
only receive `RoomSendQueueUpdate`s, we can't iterate over local events
in the `SendQueue` like we do for the `EventCache` to re-compute the
latest event if a local event has been cancelled or updated.

A particular care must also be applied when a local event is wedged:
this local event and all its followings must be marked as wedged too,
so that the `LatestEventValue` is `LocalIsWedged`. Same when the local
event is unwedged.
This commit is contained in:
Ivan Enderlin
2025-08-11 10:45:57 +02:00
parent f2fbdfbac2
commit 9cc29d7c65
2 changed files with 369 additions and 4 deletions

View File

@@ -46,6 +46,11 @@ pub(super) struct LatestEvent {
/// The thread (if any) owning this latest event.
_thread_id: Option<OwnedEventId>,
/// A buffer of the current [`LatestEventValue`] computed for local events
/// seen by the send queue. See [`LatestEventValuesForLocalEvents`] to learn
/// more.
buffer_of_values_for_local_events: LatestEventValuesForLocalEvents,
/// The latest event value.
current_value: SharedObservable<LatestEventValue, AsyncLock>,
}
@@ -60,6 +65,7 @@ impl LatestEvent {
Self {
_room_id: room_id.to_owned(),
_thread_id: thread_id.map(ToOwned::to_owned),
buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(),
current_value: SharedObservable::new_async(
LatestEventValue::new_remote(room_event_cache, weak_room).await,
),
@@ -86,8 +92,21 @@ impl LatestEvent {
/// Update the inner latest event value, based on the send queue
/// (specifically with a [`RoomSendQueueUpdate`]).
pub async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
todo!()
pub async fn update_with_send_queue(
&mut self,
send_queue_update: &RoomSendQueueUpdate,
room_event_cache: &RoomEventCache,
power_levels: &Option<(&UserId, RoomPowerLevels)>,
) {
let new_value = LatestEventValue::new_local(
send_queue_update,
&mut self.buffer_of_values_for_local_events,
room_event_cache,
power_levels,
)
.await;
self.update(new_value).await;
}
/// Update [`Self::current_value`] if and only if the `new_value` is not
@@ -150,6 +169,332 @@ impl LatestEventValue {
.map(Self::Remote)
.unwrap_or_default()
}
/// Create a new [`LatestEventValue::LocalIsSending`] or
/// [`LatestEventValue::LocalIsWedged`].
async fn new_local(
send_queue_update: &RoomSendQueueUpdate,
buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
room_event_cache: &RoomEventCache,
power_levels: &Option<(&UserId, RoomPowerLevels)>,
) -> Self {
use crate::send_queue::{LocalEcho, LocalEchoContent};
match send_queue_update {
// A new local event is being sent.
//
// Let's create the `LatestEventValue` and push it in the buffer of values.
RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id,
content: local_echo_content,
}) => match local_echo_content {
LocalEchoContent::Event { serialized_event: content, .. } => {
if let Ok(content) = content.deserialize() {
if let Some(kind) = find_and_map_any_message_like_event_content(content) {
let value = Self::LocalIsSending(kind);
buffer_of_values_for_local_events
.push(transaction_id.to_owned(), value.clone());
value
} else {
Self::None
}
} else {
Self::None
}
}
LocalEchoContent::React { .. } => Self::None,
},
// A local event has been cancelled before being sent.
//
// Remove the calculated `LatestEventValue` from the buffer of values, and return the
// last `LatestEventValue` or calculate a new one.
RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
buffer_of_values_for_local_events.remove(position);
}
Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
power_levels,
)
.await
}
// A local event has successfully been sent!
//
// Unwedge all wedged values after the one matching `transaction_id`. Indeed, if
// an event has been sent, it means the send queue is working, so if any value has been
// marked as wedged, it must be marked as unwedged. Then, remove the calculated
// `LatestEventValue` from the buffer of values. Finally, return the last
// `LatestEventValue` or calculate a new one.
RoomSendQueueUpdate::SentEvent { transaction_id, .. } => {
let position = buffer_of_values_for_local_events.unwedged_after(transaction_id);
if let Some(position) = position {
buffer_of_values_for_local_events.remove(position);
}
Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
power_levels,
)
.await
}
// A local event has been replaced by another one.
//
// Replace the latest event value matching `transaction_id` in the buffer if it exists
// (note: it should!), and return the last `LatestEventValue` or calculate a new one.
RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content: content } => {
if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
if let Ok(content) = content.deserialize() {
if let Some(kind) = find_and_map_any_message_like_event_content(content) {
buffer_of_values_for_local_events.replace_kind(position, kind);
}
} else {
return Self::None;
}
}
Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
power_levels,
)
.await
}
// An error has occurred.
//
// Mark the latest event value matching `transaction_id`, and all its following values,
// as wedged.
RoomSendQueueUpdate::SendError { transaction_id, .. } => {
buffer_of_values_for_local_events.wedged_from(transaction_id);
Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
power_levels,
)
.await
}
// A local event has been unwedged and sending is being retried.
//
// Mark the latest event value matching `transaction_id`, and all its following values,
// as unwedged.
RoomSendQueueUpdate::RetryEvent { transaction_id } => {
buffer_of_values_for_local_events.unwedged_from(transaction_id);
Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
power_levels,
)
.await
}
// A media upload has made progress.
//
// Nothing to do here.
RoomSendQueueUpdate::MediaUpload { .. } => Self::None,
}
}
/// Get the last [`LatestEventValue`] from the local latest event values if
/// any, or create a new [`LatestEventValue`] from the remote events.
///
/// If the buffer of latest event values is not empty, let's return the last
/// one. Otherwise, it means we no longer have any local event: let's
/// fallback on remote event!
async fn new_local_or_remote(
buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
room_event_cache: &RoomEventCache,
power_levels: &Option<(&UserId, RoomPowerLevels)>,
) -> Self {
if let Some(value) = buffer_of_values_for_local_events.last() {
value.clone()
} else {
Self::new_remote_with_power_levels(room_event_cache, power_levels).await
}
}
}
/// A buffer of the current [`LatestEventValue`] computed for local events
/// seen by the send queue. It is used by
/// [`LatestEvent::buffer_of_values_for_local_events`].
///
/// The system does only receive [`RoomSendQueueUpdate`]s. It's not designed to
/// iterate over local events in the send queue when a local event is changed
/// (cancelled, or updated for example). That's why we keep our own buffer here.
/// Imagine the system receives 4 [`RoomSendQueueUpdate`]:
///
/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
/// 3. [`RoomSendQueueUpdate::ReplacedLocalEvent`]: replaced the first local
/// event,
/// 4. [`RoomSendQueueUpdate::CancelledLocalEvent`]: cancelled the second local
/// event.
///
/// `NewLocalEvent`s will trigger the computation of new
/// `LatestEventValue`s, but `CancelledLocalEvent` for example doesn't hold
/// any information to compute a new `LatestEventValue`, so we need to
/// remember the previous values, until the local events are sent and
/// removed from this buffer.
///
/// Another reason why we need a buffer is to handle wedged local event. Imagine
/// the system receives 3 [`RoomSendQueueUpdate`]:
///
/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
/// 3. [`RoomSendQueueUpdate::SendError`]: the first local event has failed to
/// be sent.
///
/// Because a `SendError` is received (targeting the first `NewLocalEvent`), the
/// send queue is stopped. However, the `LatestEventValue` targets the second
/// `NewLocalEvent`. The system must consider that when a local event is wedged,
/// all the following local events must also be marked as wedged. And vice
/// versa, when the send queue is able to send an event again, all the following
/// local events must be marked as unwedged.
///
/// This type isolates a couple of methods designed to manage these specific
/// behaviours.
#[derive(Debug)]
struct LatestEventValuesForLocalEvents {
buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
}
impl LatestEventValuesForLocalEvents {
/// Create a new [`LatestEventValuesForLocalEvents`].
fn new() -> Self {
Self { buffer: Vec::with_capacity(2) }
}
/// Get the last [`LatestEventValue`].
fn last(&self) -> Option<&LatestEventValue> {
self.buffer.last().map(|(_, value)| value)
}
/// Find the position of the [`LatestEventValue`] matching `transaction_id`.
fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
self.buffer
.iter()
.position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
}
/// Push a new [`LatestEventValue`].
///
/// # Panics
///
/// Panics if `value` is not of kind [`LatestEventValue::LocalIsSending`] or
/// [`LatestEventValue::LocalIsWedged`].
fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
assert!(
matches!(
value,
LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalIsWedged(_)
),
"`value` must be either `LocalIsSending` or `LocalIsWedged`"
);
self.buffer.push((transaction_id, value));
}
/// Replace the [`LatestEventKind`] of the [`LatestEventValue`] at position
/// `position`.
///
/// # Panics
///
/// Panics if:
/// - `position` is strictly greater than buffer's length,
/// - the [`LatestEventValue`] is not of kind
/// [`LatestEventValue::LocalIsSending`] or
/// [`LatestEventValue::LocalIsWedged`].
fn replace_kind(&mut self, position: usize, new_kind: LatestEventKind) {
let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
match value {
LatestEventValue::LocalIsSending(kind) => *kind = new_kind,
LatestEventValue::LocalIsWedged(kind) => *kind = new_kind,
_ => panic!("`value` must be either `LocalIsSending` or `LocalIsWedged`"),
}
}
/// Remove the [`LatestEventValue`] at position `position`.
///
/// # Panics
///
/// Panics if `position` is strictly greater than buffer's length.
fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
self.buffer.remove(position)
}
/// Mark the `LatestEventValue` matching `transaction_id`, and all the
/// following values, as wedged.
fn wedged_from(&mut self, transaction_id: &TransactionId) {
let mut values = self.buffer.iter_mut();
if let Some(first_value_to_wedge) = values
.by_ref()
.find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
{
// Iterate over the found value and the following ones.
for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
if let LatestEventValue::LocalIsSending(kind) = value_to_wedge {
*value_to_wedge = LatestEventValue::LocalIsWedged(kind.clone());
}
}
}
}
/// Mark the `LatestEventValue` matching `transaction_id`, and all the
/// following values, as unwedged.
fn unwedged_from(&mut self, transaction_id: &TransactionId) {
let mut values = self.buffer.iter_mut();
if let Some(first_value_to_unwedge) = values
.by_ref()
.find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
{
// Iterate over the found value and the following ones.
for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
if let LatestEventValue::LocalIsWedged(kind) = value_to_unwedge {
*value_to_unwedge = LatestEventValue::LocalIsSending(kind.clone());
}
}
}
}
/// Mark all the following values after the `LatestEventValue` matching
/// `transaction_id` as unwedged.
///
/// Note that contrary to [`Self::unwedged_from`], the `LatestEventValue` is
/// untouched. However, its position is returned (if any).
fn unwedged_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
let mut values = self.buffer.iter_mut();
if let Some(position) = values
.by_ref()
.position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
{
// Iterate over all values after the found one.
for (_, value_to_unwedge) in values {
if let LatestEventValue::LocalIsWedged(kind) = value_to_unwedge {
*value_to_unwedge = LatestEventValue::LocalIsSending(kind.clone());
}
}
Some(position)
} else {
None
}
}
}
/// A latest event value!

View File

@@ -539,10 +539,30 @@ impl RoomLatestEvents {
/// Update the latest events for the room and its threads, based on the
/// send queue update.
async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
self.for_the_room.update_with_send_queue(send_queue_update).await;
// Get the power levels of the user for the current room if the `WeakRoom` is
// still valid.
//
// Get it once for all the updates of all the latest events for this room (be
// the room and its threads).
let room = self.weak_room.get();
let power_levels = match &room {
Some(room) => {
let power_levels = room.power_levels().await.ok();
Some(room.own_user_id()).zip(power_levels)
}
None => None,
};
self.for_the_room
.update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels)
.await;
for latest_event in self.per_thread.values_mut() {
latest_event.update_with_send_queue(send_queue_update).await;
latest_event
.update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels)
.await;
}
}
}