mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-08 16:04:13 -04:00
feat(sdk): LocalEventTimelineItem has a new send_state field
feat(sdk): `LocalEventTimelineItem` has a new `send_state` field
This commit is contained in:
@@ -91,11 +91,12 @@ mod uniffi_types {
|
||||
},
|
||||
timeline::{
|
||||
EmoteMessageContent, EncryptedMessage, EventTimelineItem, FileInfo, FileMessageContent,
|
||||
FormattedBody, ImageInfo, ImageMessageContent, InsertAtData, MembershipChange, Message,
|
||||
MessageFormat, MessageType, NoticeMessageContent, OtherState, Profile, Reaction,
|
||||
TextMessageContent, ThumbnailInfo, TimelineChange, TimelineDiff, TimelineItem,
|
||||
TimelineItemContent, TimelineItemContentKind, UpdateAtData, VideoInfo,
|
||||
VideoMessageContent, VirtualTimelineItem,
|
||||
FormattedBody, ImageInfo, ImageMessageContent, InsertAtData,
|
||||
LocalEventTimelineItemSendState, MembershipChange, Message, MessageFormat, MessageType,
|
||||
NoticeMessageContent, OtherState, Profile, Reaction, TextMessageContent, ThumbnailInfo,
|
||||
TimelineChange, TimelineDiff, TimelineItem, TimelineItemContent,
|
||||
TimelineItemContentKind, UpdateAtData, VideoInfo, VideoMessageContent,
|
||||
VirtualTimelineItem,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -170,6 +170,32 @@ impl TimelineItem {
|
||||
}
|
||||
}
|
||||
|
||||
/// This type represents the “send state” of a local event timeline item.
|
||||
#[derive(Clone, uniffi::Enum)]
|
||||
pub enum LocalEventTimelineItemSendState {
|
||||
/// The local event has not been sent yet.
|
||||
NotSendYet,
|
||||
/// The local event has been sent to the server, but unsuccessfully: The
|
||||
/// sending has failed.
|
||||
SendingFailed,
|
||||
/// The local event has been sent successfully to the server.
|
||||
Sent,
|
||||
}
|
||||
|
||||
impl From<matrix_sdk::room::timeline::LocalEventTimelineItemSendState>
|
||||
for LocalEventTimelineItemSendState
|
||||
{
|
||||
fn from(value: matrix_sdk::room::timeline::LocalEventTimelineItemSendState) -> Self {
|
||||
use matrix_sdk::room::timeline::LocalEventTimelineItemSendState::*;
|
||||
|
||||
match value {
|
||||
NotSentYet => Self::NotSendYet,
|
||||
SendingFailed => Self::SendingFailed,
|
||||
Sent => Self::Sent,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Object)]
|
||||
pub struct EventTimelineItem(pub(crate) matrix_sdk::room::timeline::EventTimelineItem);
|
||||
|
||||
@@ -234,6 +260,15 @@ impl EventTimelineItem {
|
||||
self.0.raw().map(|r| r.json().get().to_owned())
|
||||
}
|
||||
|
||||
pub fn local_send_state(&self) -> Option<LocalEventTimelineItemSendState> {
|
||||
use matrix_sdk::room::timeline::EventTimelineItem::*;
|
||||
|
||||
match &self.0 {
|
||||
Local(local_event) => Some(local_event.send_state.into()),
|
||||
Remote(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fmt_debug(&self) -> String {
|
||||
format!("{:#?}", self.0)
|
||||
}
|
||||
|
||||
@@ -43,8 +43,8 @@ use tracing::{debug, error, field::debug, info, instrument, trace, warn};
|
||||
use super::{
|
||||
event_item::{
|
||||
AnyOtherFullStateEventContent, BundledReactions, LocalEventTimelineItem,
|
||||
MemberProfileChange, OtherState, Profile, RemoteEventTimelineItem, RoomMembershipChange,
|
||||
Sticker, TimelineDetails,
|
||||
LocalEventTimelineItemSendState, MemberProfileChange, OtherState, Profile,
|
||||
RemoteEventTimelineItem, RoomMembershipChange, Sticker, TimelineDetails,
|
||||
},
|
||||
find_event_by_id, find_event_by_txn_id, find_read_marker, EventTimelineItem, Message,
|
||||
TimelineInnerMetadata, TimelineItem, TimelineItemContent, VirtualTimelineItem,
|
||||
@@ -240,13 +240,16 @@ impl<'a, 'i> TimelineEventHandler<'a, 'i> {
|
||||
#[instrument(skip_all, fields(txn_id, event_id, position))]
|
||||
pub(super) fn handle_event(mut self, event_kind: TimelineEventKind) -> HandleEventResult {
|
||||
let span = tracing::Span::current();
|
||||
|
||||
match &self.flow {
|
||||
Flow::Local { txn_id, .. } => {
|
||||
span.record("txn_id", debug(txn_id));
|
||||
}
|
||||
|
||||
Flow::Remote { event_id, txn_id, position, .. } => {
|
||||
span.record("event_id", debug(event_id));
|
||||
span.record("position", debug(position));
|
||||
|
||||
if let Some(txn_id) = txn_id {
|
||||
span.record("txn_id", debug(txn_id));
|
||||
}
|
||||
@@ -279,21 +282,27 @@ impl<'a, 'i> TimelineEventHandler<'a, 'i> {
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
TimelineEventKind::RedactedMessage => {
|
||||
self.add(NewEventTimelineItem::redacted_message());
|
||||
}
|
||||
|
||||
TimelineEventKind::Redaction { redacts, content } => {
|
||||
self.handle_redaction(redacts, content);
|
||||
}
|
||||
|
||||
TimelineEventKind::RoomMember { user_id, content, sender } => {
|
||||
self.add(NewEventTimelineItem::room_member(user_id, content, sender));
|
||||
}
|
||||
|
||||
TimelineEventKind::OtherState { state_key, content } => {
|
||||
self.add(NewEventTimelineItem::other_state(state_key, content));
|
||||
}
|
||||
|
||||
TimelineEventKind::FailedToParseMessageLike { event_type, error } => {
|
||||
self.add(NewEventTimelineItem::failed_to_parse_message_like(event_type, error));
|
||||
}
|
||||
|
||||
TimelineEventKind::FailedToParseState { event_type, state_key, error } => {
|
||||
self.add(NewEventTimelineItem::failed_to_parse_state(event_type, state_key, error));
|
||||
}
|
||||
@@ -488,6 +497,7 @@ impl<'a, 'i> TimelineEventHandler<'a, 'i> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a new event item in the timeline.
|
||||
fn add(&mut self, item: NewEventTimelineItem) {
|
||||
self.result.item_added = true;
|
||||
|
||||
@@ -500,6 +510,7 @@ impl<'a, 'i> TimelineEventHandler<'a, 'i> {
|
||||
match &self.flow {
|
||||
Flow::Local { txn_id, timestamp } => {
|
||||
EventTimelineItem::Local(LocalEventTimelineItem {
|
||||
send_state: LocalEventTimelineItemSendState::NotSentYet,
|
||||
transaction_id: txn_id.to_owned(),
|
||||
event_id: None,
|
||||
sender,
|
||||
|
||||
@@ -180,8 +180,22 @@ impl EventTimelineItem {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
/// This type represents the “send state” of a local event timeline item.
|
||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||
pub enum LocalEventTimelineItemSendState {
|
||||
/// The local event has not been sent yet.
|
||||
NotSentYet,
|
||||
/// The local event has been sent to the server, but unsuccessfully: The
|
||||
/// sending has failed.
|
||||
SendingFailed,
|
||||
/// The local event has been sent successfully to the server.
|
||||
Sent,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LocalEventTimelineItem {
|
||||
/// The send state of this local event.
|
||||
pub send_state: LocalEventTimelineItemSendState,
|
||||
/// The transaction ID.
|
||||
pub transaction_id: OwnedTransactionId,
|
||||
/// The event ID received from the server in the event-sending response.
|
||||
@@ -198,8 +212,23 @@ pub struct LocalEventTimelineItem {
|
||||
|
||||
impl LocalEventTimelineItem {
|
||||
/// Clone the current event item, and update its `event_id`.
|
||||
pub(super) fn with_event_id(&self, event_id: OwnedEventId) -> Self {
|
||||
Self { event_id: Some(event_id), ..self.clone() }
|
||||
///
|
||||
/// `event_id` is optional:
|
||||
/// * `Some(_)` means the local event has been sent successfully to the
|
||||
/// server, its send state will be moved to
|
||||
/// [`LocalEventTimelineItemSendState::Sent`].
|
||||
/// * `None` means the local event has been failed to be sent to the
|
||||
/// server, its send state will be moved to
|
||||
/// [`LocalEventTimelineItemSendState::SendingFailed`].
|
||||
pub(super) fn with_event_id(&self, event_id: Option<OwnedEventId>) -> Self {
|
||||
Self {
|
||||
send_state: match &event_id {
|
||||
Some(_) => LocalEventTimelineItemSendState::Sent,
|
||||
None => LocalEventTimelineItemSendState::SendingFailed,
|
||||
},
|
||||
event_id,
|
||||
..self.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ use matrix_sdk_base::{
|
||||
locks::Mutex,
|
||||
};
|
||||
use ruma::{
|
||||
api::client::message::send_message_event::v3::Response as SendMessageEventResponse,
|
||||
events::{
|
||||
fully_read::FullyReadEvent, relation::Annotation, AnyMessageLikeEventContent,
|
||||
AnySyncTimelineEvent,
|
||||
@@ -109,6 +110,7 @@ impl<P: ProfileProvider> TimelineInner<P> {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Handle the creation of a new local event.
|
||||
pub(super) async fn handle_local_event(
|
||||
&self,
|
||||
txn_id: OwnedTransactionId,
|
||||
@@ -134,6 +136,68 @@ impl<P: ProfileProvider> TimelineInner<P> {
|
||||
.handle_event(kind);
|
||||
}
|
||||
|
||||
/// Handle the response returned by the server when a local event has been
|
||||
/// sent.
|
||||
pub(super) fn handle_local_event_send_response(
|
||||
&self,
|
||||
txn_id: &TransactionId,
|
||||
response: crate::error::Result<SendMessageEventResponse>,
|
||||
) -> crate::error::Result<()> {
|
||||
match response {
|
||||
Ok(response) => {
|
||||
self.update_event_id_of_local_event(txn_id, Some(response.event_id));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(error) => {
|
||||
self.update_event_id_of_local_event(txn_id, None);
|
||||
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the event ID of a local event represented by a transaction ID.
|
||||
///
|
||||
/// If the event ID is `None`, it means there is no event ID returned by the
|
||||
/// server, so the sending has failed. If the event ID is `Some(_)`, it
|
||||
/// means the sending has been successful.
|
||||
///
|
||||
/// If no local event is found, a warning is raised.
|
||||
pub(super) fn update_event_id_of_local_event(
|
||||
&self,
|
||||
txn_id: &TransactionId,
|
||||
event_id: Option<OwnedEventId>,
|
||||
) {
|
||||
let mut lock = self.items.lock_mut();
|
||||
|
||||
// Look for the local event by the transaction ID.
|
||||
if let Some((idx, local_event_item)) = find_event_by_txn_id(&lock, txn_id) {
|
||||
// An event ID already exists, that's a broken state, let's emit an error but
|
||||
// also override to the given event ID.
|
||||
if let Some(existing_event_id) = &local_event_item.event_id {
|
||||
error!(
|
||||
?existing_event_id, new_event_id = ?event_id, ?txn_id,
|
||||
"Local echo already has an event ID"
|
||||
);
|
||||
}
|
||||
|
||||
lock.set_cloned(
|
||||
idx,
|
||||
Arc::new(TimelineItem::Event(local_event_item.with_event_id(event_id).into())),
|
||||
);
|
||||
}
|
||||
// No local event has been found.
|
||||
else if let Some(event_id) = event_id {
|
||||
if find_event_by_id(&lock, &event_id).is_none() {
|
||||
// Event isn't found by transaction ID, and also not by event ID
|
||||
// (which it would if the remote echo comes in before the send-event
|
||||
// response)
|
||||
warn!(?txn_id, "Timeline item not found, can't add event ID");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a back-paginated event.
|
||||
///
|
||||
/// Returns the number of timeline updates that were made.
|
||||
@@ -153,29 +217,6 @@ impl<P: ProfileProvider> TimelineInner<P> {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Update the transaction ID by an event ID.
|
||||
pub(super) fn add_event_id(&self, txn_id: &TransactionId, event_id: OwnedEventId) {
|
||||
let mut lock = self.items.lock_mut();
|
||||
if let Some((idx, local_event_item)) = find_event_by_txn_id(&lock, txn_id) {
|
||||
if let Some(existing_event_id) = &local_event_item.event_id {
|
||||
error!(
|
||||
?existing_event_id, new_event_id = ?event_id, ?txn_id,
|
||||
"Local echo already has an event ID"
|
||||
);
|
||||
}
|
||||
|
||||
lock.set_cloned(
|
||||
idx,
|
||||
Arc::new(TimelineItem::Event(local_event_item.with_event_id(event_id).into())),
|
||||
);
|
||||
} else if find_event_by_id(&lock, &event_id).is_none() {
|
||||
// Event isn't found by transaction ID, and also not by event ID
|
||||
// (which it would if the remote echo comes in before the send-event
|
||||
// response)
|
||||
warn!(?txn_id, "Timeline item not found, can't add event ID");
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(super) fn add_loading_indicator(&self) {
|
||||
let mut lock = self.items.lock_mut();
|
||||
|
||||
@@ -54,9 +54,10 @@ use self::{
|
||||
};
|
||||
pub use self::{
|
||||
event_item::{
|
||||
AnyOtherFullStateEventContent, EncryptedMessage, EventTimelineItem, MemberProfileChange,
|
||||
MembershipChange, Message, OtherState, Profile, ReactionDetails, RoomMembershipChange,
|
||||
Sticker, TimelineDetails, TimelineItemContent,
|
||||
AnyOtherFullStateEventContent, EncryptedMessage, EventTimelineItem,
|
||||
LocalEventTimelineItemSendState, MemberProfileChange, MembershipChange, Message,
|
||||
OtherState, Profile, ReactionDetails, RoomMembershipChange, Sticker, TimelineDetails,
|
||||
TimelineItemContent,
|
||||
},
|
||||
pagination::{PaginationOptions, PaginationOutcome},
|
||||
virtual_item::VirtualTimelineItem,
|
||||
@@ -371,10 +372,8 @@ impl Timeline {
|
||||
// Not ideal, but works for now.
|
||||
let room = Joined { inner: self.room().clone() };
|
||||
|
||||
let response = room.send(content, Some(&txn_id)).await?;
|
||||
self.inner.add_event_id(&txn_id, response.event_id);
|
||||
|
||||
Ok(())
|
||||
let response = room.send(content, Some(&txn_id)).await;
|
||||
self.inner.handle_local_event_send_response(&txn_id, response)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -58,6 +58,7 @@ use super::{
|
||||
EventTimelineItem, MembershipChange, Profile, TimelineInner, TimelineItem, TimelineItemContent,
|
||||
VirtualTimelineItem,
|
||||
};
|
||||
use crate::room::timeline::event_item::LocalEventTimelineItemSendState;
|
||||
|
||||
static ALICE: Lazy<&UserId> = Lazy::new(|| user_id!("@alice:server.name"));
|
||||
static BOB: Lazy<&UserId> = Lazy::new(|| user_id!("@bob:other.server"));
|
||||
@@ -370,7 +371,7 @@ async fn invalid_event() {
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn remote_echo_without_txn_id() {
|
||||
async fn remote_echo_full_trip() {
|
||||
let timeline = TestTimeline::new();
|
||||
let mut stream = timeline.stream();
|
||||
|
||||
@@ -383,18 +384,41 @@ async fn remote_echo_without_txn_id() {
|
||||
|
||||
let _day_divider = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
|
||||
|
||||
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
|
||||
assert_matches!(item.as_event().unwrap(), EventTimelineItem::Local(_));
|
||||
// Scenario 1: The local event has not been sent yet to the server.
|
||||
{
|
||||
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
|
||||
let event = item.as_event().unwrap().as_local().unwrap();
|
||||
assert_eq!(event.send_state, LocalEventTimelineItemSendState::NotSentYet);
|
||||
}
|
||||
|
||||
// That has an event ID assigned already (from the response to sending it)…
|
||||
let event_id = event_id!("$W6mZSLWMmfuQQ9jhZWeTxFIM");
|
||||
timeline.inner.add_event_id(&txn_id, event_id.to_owned());
|
||||
// Scenario 2: The local event has not been sent to the server successfully, it
|
||||
// has failed. In this case, there is no event ID.
|
||||
{
|
||||
let event_id = None;
|
||||
|
||||
let item =
|
||||
assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { value, index: 1 }) => value);
|
||||
assert_matches!(item.as_event().unwrap(), EventTimelineItem::Local(_));
|
||||
timeline.inner.update_event_id_of_local_event(&txn_id, event_id);
|
||||
|
||||
// When an event with the same ID comes in…
|
||||
let item = assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { value, index: 1 }) => value);
|
||||
let event = item.as_event().unwrap().as_local().unwrap();
|
||||
assert_eq!(event.send_state, LocalEventTimelineItemSendState::SendingFailed);
|
||||
}
|
||||
|
||||
// Scenario 3: The local event has been sent successfully to the server and an
|
||||
// event ID has been received as part of the server's response.
|
||||
let event_id = {
|
||||
let event_id = event_id!("$W6mZSLWMmfuQQ9jhZWeTxFIM");
|
||||
|
||||
timeline.inner.update_event_id_of_local_event(&txn_id, Some(event_id.to_owned()));
|
||||
|
||||
let item = assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { value, index: 1 }) => value);
|
||||
let event = item.as_event().unwrap().as_local().unwrap();
|
||||
assert_eq!(event.send_state, LocalEventTimelineItemSendState::Sent);
|
||||
|
||||
event_id
|
||||
};
|
||||
|
||||
// Now, a sync has been run against the server, and an event with the same ID
|
||||
// comes in.
|
||||
timeline
|
||||
.handle_live_custom_event(json!({
|
||||
"content": {
|
||||
@@ -410,6 +434,7 @@ async fn remote_echo_without_txn_id() {
|
||||
|
||||
// The local echo is removed
|
||||
assert_matches!(stream.next().await, Some(VecDiff::Pop { .. }));
|
||||
|
||||
// This day divider shouldn't be present, or the previous one should be
|
||||
// removed. There being a two day dividers in a row is a bug, but it's
|
||||
// non-trivial to fix and rare enough that we can fix it later (only happens
|
||||
|
||||
Reference in New Issue
Block a user