mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-07 23:44:53 -04:00
refactor(sdk): Move LocalEventTimelineItem#event_id into send_state
This commit is contained in:
committed by
Jonas Platte
parent
a48fd77c4a
commit
c8021cf2ba
@@ -179,17 +179,17 @@ pub enum EventSendState {
|
||||
/// sending has failed.
|
||||
SendingFailed,
|
||||
/// The local event has been sent successfully to the server.
|
||||
Sent,
|
||||
Sent { event_id: String },
|
||||
}
|
||||
|
||||
impl From<matrix_sdk::room::timeline::EventSendState> for EventSendState {
|
||||
fn from(value: matrix_sdk::room::timeline::EventSendState) -> Self {
|
||||
impl From<&matrix_sdk::room::timeline::EventSendState> for EventSendState {
|
||||
fn from(value: &matrix_sdk::room::timeline::EventSendState) -> Self {
|
||||
use matrix_sdk::room::timeline::EventSendState::*;
|
||||
|
||||
match value {
|
||||
NotSentYet => Self::NotSendYet,
|
||||
SendingFailed => Self::SendingFailed,
|
||||
Sent => Self::Sent,
|
||||
Sent { event_id } => Self::Sent { event_id: event_id.to_string() },
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -269,7 +269,7 @@ impl EventTimelineItem {
|
||||
use matrix_sdk::room::timeline::EventTimelineItem::*;
|
||||
|
||||
match &self.0 {
|
||||
Local(local_event) => Some(local_event.send_state.into()),
|
||||
Local(local_event) => Some((&local_event.send_state).into()),
|
||||
Remote(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -535,7 +535,6 @@ impl<'a, 'i> TimelineEventHandler<'a, 'i> {
|
||||
EventTimelineItem::Local(LocalEventTimelineItem {
|
||||
send_state: EventSendState::NotSentYet,
|
||||
transaction_id: txn_id.to_owned(),
|
||||
event_id: None,
|
||||
sender,
|
||||
sender_profile,
|
||||
timestamp: *timestamp,
|
||||
|
||||
@@ -88,10 +88,10 @@ impl EventTimelineItem {
|
||||
/// case of a remote event.
|
||||
pub fn unique_identifier(&self) -> String {
|
||||
match self {
|
||||
Self::Local(LocalEventTimelineItem { transaction_id, event_id, .. }) => {
|
||||
match event_id {
|
||||
Some(event_id) => event_id.to_string(),
|
||||
None => transaction_id.to_string(),
|
||||
Self::Local(LocalEventTimelineItem { transaction_id, send_state, .. }) => {
|
||||
match send_state {
|
||||
EventSendState::Sent { event_id } => event_id.to_string(),
|
||||
_ => transaction_id.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,7 +123,7 @@ impl EventTimelineItem {
|
||||
/// of the send request that created the event.
|
||||
pub fn event_id(&self) -> Option<&EventId> {
|
||||
match self {
|
||||
Self::Local(local_event) => local_event.event_id.as_deref(),
|
||||
Self::Local(local_event) => local_event.event_id(),
|
||||
Self::Remote(remote_event) => Some(&remote_event.event_id),
|
||||
}
|
||||
}
|
||||
@@ -209,7 +209,7 @@ impl EventTimelineItem {
|
||||
}
|
||||
|
||||
/// This type represents the "send state" of a local event timeline item.
|
||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum EventSendState {
|
||||
/// The local event has not been sent yet.
|
||||
NotSentYet,
|
||||
@@ -217,7 +217,10 @@ pub enum EventSendState {
|
||||
/// sending has failed.
|
||||
SendingFailed,
|
||||
/// The local event has been sent successfully to the server.
|
||||
Sent,
|
||||
Sent {
|
||||
/// The event ID assigned by the server.
|
||||
event_id: OwnedEventId,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -226,8 +229,6 @@ pub struct LocalEventTimelineItem {
|
||||
pub send_state: EventSendState,
|
||||
/// The transaction ID.
|
||||
pub transaction_id: OwnedTransactionId,
|
||||
/// The event ID received from the server in the event-sending response.
|
||||
pub event_id: Option<OwnedEventId>,
|
||||
/// The sender of the event.
|
||||
pub sender: OwnedUserId,
|
||||
/// The sender's profile of the event.
|
||||
@@ -239,25 +240,20 @@ pub struct LocalEventTimelineItem {
|
||||
}
|
||||
|
||||
impl LocalEventTimelineItem {
|
||||
/// Clone the current event item, and update its `event_id`.
|
||||
/// Get the event ID of this item.
|
||||
///
|
||||
/// `event_id` is optional:
|
||||
/// * `Some(_)` means the local event has been sent successfully to the
|
||||
/// server, its send state will be moved to
|
||||
/// [`LocalEventTimelineItemSendState::Sent`].
|
||||
/// * `None` means the local event has been failed to be sent to the
|
||||
/// server, its send state will be moved to
|
||||
/// [`LocalEventTimelineItemSendState::SendingFailed`].
|
||||
pub(super) fn with_event_id(&self, event_id: Option<OwnedEventId>) -> Self {
|
||||
Self {
|
||||
send_state: match &event_id {
|
||||
Some(_) => EventSendState::Sent,
|
||||
None => EventSendState::SendingFailed,
|
||||
},
|
||||
event_id,
|
||||
..self.clone()
|
||||
/// Will be `Some` if and only if `send_state` is `EventSendState::Sent`.
|
||||
pub fn event_id(&self) -> Option<&EventId> {
|
||||
match &self.send_state {
|
||||
EventSendState::Sent { event_id } => Some(event_id),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Clone the current event item, and update its `send_state`.
|
||||
pub(super) fn with_send_state(&self, send_state: EventSendState) -> Self {
|
||||
Self { send_state, ..self.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LocalEventTimelineItem> for EventTimelineItem {
|
||||
|
||||
@@ -20,10 +20,10 @@ use ruma::{
|
||||
AnySyncTimelineEvent,
|
||||
},
|
||||
serde::Raw,
|
||||
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomId,
|
||||
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomId,
|
||||
TransactionId, UserId,
|
||||
};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{debug, error, field::debug, info, warn};
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
@@ -32,7 +32,7 @@ use super::{
|
||||
update_read_marker, Flow, HandleEventResult, TimelineEventHandler, TimelineEventKind,
|
||||
TimelineEventMetadata, TimelineItemPosition,
|
||||
},
|
||||
rfind_event_item, EventTimelineItem, Profile, TimelineItem,
|
||||
rfind_event_item, EventSendState, EventTimelineItem, Profile, TimelineItem,
|
||||
};
|
||||
use crate::{
|
||||
events::SyncTimelineEventWithoutContent,
|
||||
@@ -160,36 +160,40 @@ impl<P: ProfileProvider> TimelineInner<P> {
|
||||
) -> crate::error::Result<()> {
|
||||
match response {
|
||||
Ok(response) => {
|
||||
self.update_event_id_of_local_event(txn_id, Some(response.event_id));
|
||||
self.update_event_send_state(
|
||||
txn_id,
|
||||
EventSendState::Sent { event_id: response.event_id },
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(error) => {
|
||||
self.update_event_id_of_local_event(txn_id, None);
|
||||
self.update_event_send_state(txn_id, EventSendState::SendingFailed);
|
||||
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the event ID of a local event represented by a transaction ID.
|
||||
///
|
||||
/// If the event ID is `None`, it means there is no event ID returned by the
|
||||
/// server, so the sending has failed. If the event ID is `Some(_)`, it
|
||||
/// means the sending has been successful.
|
||||
/// Update the send state of a local event represented by a transaction ID.
|
||||
///
|
||||
/// If no local event is found, a warning is raised.
|
||||
pub(super) fn update_event_id_of_local_event(
|
||||
pub(super) fn update_event_send_state(
|
||||
&self,
|
||||
txn_id: &TransactionId,
|
||||
event_id: Option<OwnedEventId>,
|
||||
send_state: EventSendState,
|
||||
) {
|
||||
let mut lock = self.items.lock_mut();
|
||||
|
||||
let new_event_id: Option<&EventId> = match &send_state {
|
||||
EventSendState::Sent { event_id } => Some(event_id),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// Look for the local event by the transaction ID or event ID.
|
||||
let result = rfind_event_item(&lock, |it| {
|
||||
it.transaction_id() == Some(txn_id)
|
||||
|| event_id.is_some() && it.event_id() == event_id.as_deref()
|
||||
|| new_event_id.is_some() && it.event_id() == new_event_id
|
||||
});
|
||||
|
||||
let Some((idx, item)) = result else {
|
||||
@@ -204,16 +208,15 @@ impl<P: ProfileProvider> TimelineInner<P> {
|
||||
return;
|
||||
};
|
||||
|
||||
// An event ID already exists, that's a broken state, let's emit an
|
||||
// error but also override to the given event ID.
|
||||
if let Some(existing_event_id) = &item.event_id {
|
||||
error!(
|
||||
?existing_event_id, new_event_id = ?event_id, ?txn_id,
|
||||
"Local echo already has an event ID"
|
||||
);
|
||||
// The event was already marked as sent, that's a broken state, let's
|
||||
// emit an error but also override to the given sent state.
|
||||
if let EventSendState::Sent { event_id: existing_event_id } = &item.send_state {
|
||||
let new_event_id = new_event_id.map(debug);
|
||||
error!(?existing_event_id, ?new_event_id, ?txn_id, "Local echo already marked as sent");
|
||||
}
|
||||
|
||||
lock.set_cloned(idx, Arc::new(TimelineItem::Event(item.with_event_id(event_id).into())));
|
||||
let new_item = TimelineItem::Event(item.with_send_state(send_state).into());
|
||||
lock.set_cloned(idx, Arc::new(new_item));
|
||||
}
|
||||
|
||||
/// Handle a back-paginated event.
|
||||
|
||||
@@ -394,11 +394,12 @@ async fn remote_echo_full_trip() {
|
||||
// Scenario 2: The local event has not been sent to the server successfully, it
|
||||
// has failed. In this case, there is no event ID.
|
||||
{
|
||||
let event_id = None;
|
||||
timeline.inner.update_event_send_state(&txn_id, EventSendState::SendingFailed);
|
||||
|
||||
timeline.inner.update_event_id_of_local_event(&txn_id, event_id);
|
||||
|
||||
let item = assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { value, index: 1 }) => value);
|
||||
let item = assert_matches!(
|
||||
stream.next().await,
|
||||
Some(VecDiff::UpdateAt { value, index: 1 }) => value
|
||||
);
|
||||
let event = item.as_event().unwrap().as_local().unwrap();
|
||||
assert_eq!(event.send_state, EventSendState::SendingFailed);
|
||||
}
|
||||
@@ -408,11 +409,17 @@ async fn remote_echo_full_trip() {
|
||||
let event_id = {
|
||||
let event_id = event_id!("$W6mZSLWMmfuQQ9jhZWeTxFIM");
|
||||
|
||||
timeline.inner.update_event_id_of_local_event(&txn_id, Some(event_id.to_owned()));
|
||||
timeline.inner.update_event_send_state(
|
||||
&txn_id,
|
||||
EventSendState::Sent { event_id: event_id.to_owned() },
|
||||
);
|
||||
|
||||
let item = assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { value, index: 1 }) => value);
|
||||
let item = assert_matches!(
|
||||
stream.next().await,
|
||||
Some(VecDiff::UpdateAt { value, index: 1 }) => value
|
||||
);
|
||||
let event = item.as_event().unwrap().as_local().unwrap();
|
||||
assert_eq!(event.send_state, EventSendState::Sent);
|
||||
assert_matches!(event.send_state, EventSendState::Sent { .. });
|
||||
|
||||
event_id
|
||||
};
|
||||
|
||||
@@ -8,7 +8,8 @@ use futures_util::StreamExt;
|
||||
use matrix_sdk::{
|
||||
config::SyncSettings,
|
||||
room::timeline::{
|
||||
AnyOtherFullStateEventContent, PaginationOptions, TimelineItemContent, VirtualTimelineItem,
|
||||
AnyOtherFullStateEventContent, EventSendState, PaginationOptions, TimelineItemContent,
|
||||
VirtualTimelineItem,
|
||||
},
|
||||
ruma::MilliSecondsSinceUnixEpoch,
|
||||
};
|
||||
@@ -186,7 +187,7 @@ async fn echo() {
|
||||
let local_echo =
|
||||
assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value);
|
||||
let item = local_echo.as_event().unwrap().as_local().unwrap();
|
||||
assert!(item.event_id.is_none());
|
||||
assert_matches!(&item.send_state, EventSendState::NotSentYet);
|
||||
|
||||
let msg = assert_matches!(&item.content, TimelineItemContent::Message(msg) => msg);
|
||||
let text = assert_matches!(msg.msgtype(), MessageType::Text(text) => text);
|
||||
@@ -200,7 +201,7 @@ async fn echo() {
|
||||
Some(VecDiff::UpdateAt { index: 1, value }) => value
|
||||
);
|
||||
let item = sent_confirmation.as_event().unwrap().as_local().unwrap();
|
||||
assert!(item.event_id.is_some());
|
||||
assert_matches!(&item.send_state, EventSendState::Sent { .. });
|
||||
|
||||
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(
|
||||
TimelineTestEvent::Custom(json!({
|
||||
|
||||
Reference in New Issue
Block a user