mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-18 13:40:55 -04:00
feat(sdk): Add FailedToParse timeline items
This commit is contained in:
committed by
Jonas Platte
parent
6a1edf133d
commit
c808a72914
147
crates/matrix-sdk/src/events.rs
Normal file
147
crates/matrix-sdk/src/events.rs
Normal file
@@ -0,0 +1,147 @@
|
||||
use ruma::{
|
||||
events::{
|
||||
EventContent, MessageLikeEventContent, MessageLikeEventType, OriginalSyncMessageLikeEvent,
|
||||
OriginalSyncStateEvent, RedactedEventContent, RedactedMessageLikeEventContent,
|
||||
RedactedStateEventContent, RedactedSyncMessageLikeEvent, RedactedSyncStateEvent, Relations,
|
||||
StateEventContent, StateEventType, StateUnsigned,
|
||||
},
|
||||
serde::from_raw_json_value,
|
||||
EventId, MilliSecondsSinceUnixEpoch, TransactionId, UserId,
|
||||
};
|
||||
use serde::{de, Deserialize, Serialize};
|
||||
use serde_json::value::RawValue as RawJsonValue;
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub(crate) enum SyncTimelineEventWithoutContent {
|
||||
OriginalMessageLike(OriginalSyncMessageLikeEvent<NoMessageLikeEventContent>),
|
||||
RedactedMessageLike(RedactedSyncMessageLikeEvent<NoMessageLikeEventContent>),
|
||||
OriginalState(OriginalSyncStateEvent<NoStateEventContent>),
|
||||
RedactedState(RedactedSyncStateEvent<NoStateEventContent>),
|
||||
}
|
||||
|
||||
impl SyncTimelineEventWithoutContent {
|
||||
pub(crate) fn event_id(&self) -> &EventId {
|
||||
match self {
|
||||
Self::OriginalMessageLike(ev) => &ev.event_id,
|
||||
Self::RedactedMessageLike(ev) => &ev.event_id,
|
||||
Self::OriginalState(ev) => &ev.event_id,
|
||||
Self::RedactedState(ev) => &ev.event_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch {
|
||||
match self {
|
||||
SyncTimelineEventWithoutContent::OriginalMessageLike(ev) => ev.origin_server_ts,
|
||||
SyncTimelineEventWithoutContent::RedactedMessageLike(ev) => ev.origin_server_ts,
|
||||
SyncTimelineEventWithoutContent::OriginalState(ev) => ev.origin_server_ts,
|
||||
SyncTimelineEventWithoutContent::RedactedState(ev) => ev.origin_server_ts,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn relations(&self) -> Option<&Relations> {
|
||||
match self {
|
||||
SyncTimelineEventWithoutContent::OriginalMessageLike(ev) => {
|
||||
ev.unsigned.relations.as_ref()
|
||||
}
|
||||
SyncTimelineEventWithoutContent::OriginalState(ev) => ev.unsigned.relations.as_ref(),
|
||||
SyncTimelineEventWithoutContent::RedactedMessageLike(_)
|
||||
| SyncTimelineEventWithoutContent::RedactedState(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn sender(&self) -> &UserId {
|
||||
match self {
|
||||
Self::OriginalMessageLike(ev) => &ev.sender,
|
||||
Self::RedactedMessageLike(ev) => &ev.sender,
|
||||
Self::OriginalState(ev) => &ev.sender,
|
||||
Self::RedactedState(ev) => &ev.sender,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn transaction_id(&self) -> Option<&TransactionId> {
|
||||
match self {
|
||||
SyncTimelineEventWithoutContent::OriginalMessageLike(ev) => {
|
||||
ev.unsigned.transaction_id.as_deref()
|
||||
}
|
||||
SyncTimelineEventWithoutContent::OriginalState(ev) => {
|
||||
ev.unsigned.transaction_id.as_deref()
|
||||
}
|
||||
SyncTimelineEventWithoutContent::RedactedMessageLike(_)
|
||||
| SyncTimelineEventWithoutContent::RedactedState(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct EventDeHelper {
|
||||
state_key: Option<de::IgnoredAny>,
|
||||
#[serde(default)]
|
||||
unsigned: UnsignedDeHelper,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Default)]
|
||||
struct UnsignedDeHelper {
|
||||
redacted_because: Option<de::IgnoredAny>,
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for SyncTimelineEventWithoutContent {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: de::Deserializer<'de>,
|
||||
{
|
||||
let json = Box::<RawJsonValue>::deserialize(deserializer)?;
|
||||
let EventDeHelper { state_key, unsigned } = from_raw_json_value(&json)?;
|
||||
|
||||
Ok(match (state_key.is_some(), unsigned.redacted_because.is_some()) {
|
||||
(false, false) => Self::OriginalMessageLike(from_raw_json_value(&json)?),
|
||||
(false, true) => Self::RedactedMessageLike(from_raw_json_value(&json)?),
|
||||
(true, false) => Self::OriginalState(from_raw_json_value(&json)?),
|
||||
(true, true) => Self::RedactedState(from_raw_json_value(&json)?),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct NoMessageLikeEventContent {
|
||||
#[serde(skip)]
|
||||
pub event_type: MessageLikeEventType,
|
||||
}
|
||||
|
||||
impl EventContent for NoMessageLikeEventContent {
|
||||
type EventType = MessageLikeEventType;
|
||||
|
||||
fn event_type(&self) -> Self::EventType {
|
||||
self.event_type.clone()
|
||||
}
|
||||
|
||||
fn from_parts(event_type: &str, _content: &RawJsonValue) -> serde_json::Result<Self> {
|
||||
Ok(Self { event_type: event_type.into() })
|
||||
}
|
||||
}
|
||||
impl MessageLikeEventContent for NoMessageLikeEventContent {}
|
||||
impl RedactedEventContent for NoMessageLikeEventContent {}
|
||||
impl RedactedMessageLikeEventContent for NoMessageLikeEventContent {}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub(crate) struct NoStateEventContent {
|
||||
#[serde(skip)]
|
||||
pub event_type: StateEventType,
|
||||
}
|
||||
|
||||
impl EventContent for NoStateEventContent {
|
||||
type EventType = StateEventType;
|
||||
|
||||
fn event_type(&self) -> Self::EventType {
|
||||
self.event_type.clone()
|
||||
}
|
||||
|
||||
fn from_parts(event_type: &str, _content: &RawJsonValue) -> serde_json::Result<Self> {
|
||||
Ok(Self { event_type: event_type.into() })
|
||||
}
|
||||
}
|
||||
impl StateEventContent for NoStateEventContent {
|
||||
type StateKey = String;
|
||||
type Unsigned = StateUnsigned<Self>;
|
||||
}
|
||||
impl RedactedEventContent for NoStateEventContent {}
|
||||
impl RedactedStateEventContent for NoStateEventContent {}
|
||||
@@ -43,6 +43,8 @@ mod sliding_sync;
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
pub mod encryption;
|
||||
#[cfg(feature = "experimental-timeline")]
|
||||
mod events;
|
||||
|
||||
pub use account::Account;
|
||||
#[cfg(feature = "sso-login")]
|
||||
|
||||
@@ -35,7 +35,7 @@ use ruma::{
|
||||
},
|
||||
},
|
||||
AnyMessageLikeEventContent, AnyStateEventContent, AnySyncMessageLikeEvent,
|
||||
AnySyncTimelineEvent, Relations,
|
||||
AnySyncTimelineEvent, MessageLikeEventType, Relations, StateEventType,
|
||||
},
|
||||
serde::Raw,
|
||||
uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
|
||||
@@ -48,6 +48,7 @@ use super::{
|
||||
find_event, find_fully_read, EventTimelineItem, Message, TimelineInner, TimelineInnerMetadata,
|
||||
TimelineItem, TimelineItemContent, TimelineKey, VirtualTimelineItem,
|
||||
};
|
||||
use crate::events::SyncTimelineEventWithoutContent;
|
||||
|
||||
impl TimelineInner {
|
||||
pub(super) async fn handle_live_event(
|
||||
@@ -221,32 +222,38 @@ fn handle_remote_event(
|
||||
timeline_items: &mut MutableVecLockMut<'_, Arc<TimelineItem>>,
|
||||
timeline_meta: &mut MutexGuard<'_, TimelineInnerMetadata>,
|
||||
) {
|
||||
let event = match raw.deserialize() {
|
||||
Ok(ev) => ev,
|
||||
Err(_e) => {
|
||||
// TODO: Add some sort of error timeline item
|
||||
return;
|
||||
}
|
||||
};
|
||||
let (event_id, sender, origin_server_ts, txn_id, relations, event_kind) =
|
||||
match raw.deserialize() {
|
||||
Ok(event) => (
|
||||
event.event_id().to_owned(),
|
||||
event.sender().to_owned(),
|
||||
event.origin_server_ts(),
|
||||
event.transaction_id().map(ToOwned::to_owned),
|
||||
event.relations().cloned(),
|
||||
event.into(),
|
||||
),
|
||||
Err(e) => match raw.deserialize_as::<SyncTimelineEventWithoutContent>() {
|
||||
Ok(event) => (
|
||||
event.event_id().to_owned(),
|
||||
event.sender().to_owned(),
|
||||
event.origin_server_ts(),
|
||||
event.transaction_id().map(ToOwned::to_owned),
|
||||
event.relations().cloned(),
|
||||
TimelineEventKind::failed_to_parse(event, e),
|
||||
),
|
||||
Err(e) => {
|
||||
warn!("Failed to deserialize timeline event: {e}");
|
||||
return;
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
let sender = event.sender().to_owned();
|
||||
let is_own_event = sender == own_user_id;
|
||||
let event_meta = TimelineEventMetadata {
|
||||
sender,
|
||||
is_own_event,
|
||||
relations: event.relations().cloned(),
|
||||
encryption_info,
|
||||
};
|
||||
let flow = Flow::Remote {
|
||||
event_id: event.event_id().to_owned(),
|
||||
origin_server_ts: event.origin_server_ts(),
|
||||
raw_event: raw,
|
||||
txn_id: event.transaction_id().map(ToOwned::to_owned),
|
||||
position,
|
||||
};
|
||||
let event_meta = TimelineEventMetadata { sender, is_own_event, relations, encryption_info };
|
||||
let flow = Flow::Remote { event_id, origin_server_ts, raw_event: raw, txn_id, position };
|
||||
|
||||
TimelineEventHandler::new(event_meta, flow, timeline_items, timeline_meta)
|
||||
.handle_event(event.into())
|
||||
.handle_event(event_kind)
|
||||
}
|
||||
|
||||
fn update_fully_read_item(
|
||||
@@ -320,12 +327,52 @@ struct TimelineEventMetadata {
|
||||
|
||||
#[derive(Clone)]
|
||||
enum TimelineEventKind {
|
||||
Message { content: AnyMessageLikeEventContent },
|
||||
Message {
|
||||
content: AnyMessageLikeEventContent,
|
||||
},
|
||||
RedactedMessage,
|
||||
Redaction { redacts: OwnedEventId, content: RoomRedactionEventContent },
|
||||
Redaction {
|
||||
redacts: OwnedEventId,
|
||||
content: RoomRedactionEventContent,
|
||||
},
|
||||
// FIXME: Split further for state keys of different type
|
||||
State { _content: AnyStateEventContent },
|
||||
State {
|
||||
_content: AnyStateEventContent,
|
||||
},
|
||||
RedactedState, // AnyRedactedStateEventContent
|
||||
FailedToParseMessageLike {
|
||||
event_type: MessageLikeEventType,
|
||||
error: Arc<serde_json::Error>,
|
||||
},
|
||||
FailedToParseState {
|
||||
event_type: StateEventType,
|
||||
state_key: String,
|
||||
error: Arc<serde_json::Error>,
|
||||
},
|
||||
}
|
||||
|
||||
impl TimelineEventKind {
|
||||
fn failed_to_parse(event: SyncTimelineEventWithoutContent, error: serde_json::Error) -> Self {
|
||||
let error = Arc::new(error);
|
||||
match event {
|
||||
SyncTimelineEventWithoutContent::OriginalMessageLike(ev) => {
|
||||
Self::FailedToParseMessageLike { event_type: ev.content.event_type, error }
|
||||
}
|
||||
SyncTimelineEventWithoutContent::RedactedMessageLike(ev) => {
|
||||
Self::FailedToParseMessageLike { event_type: ev.content.event_type, error }
|
||||
}
|
||||
SyncTimelineEventWithoutContent::OriginalState(ev) => Self::FailedToParseState {
|
||||
event_type: ev.content.event_type,
|
||||
state_key: ev.state_key,
|
||||
error,
|
||||
},
|
||||
SyncTimelineEventWithoutContent::RedactedState(ev) => Self::FailedToParseState {
|
||||
event_type: ev.content.event_type,
|
||||
state_key: ev.state_key,
|
||||
error,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<AnySyncTimelineEvent> for TimelineEventKind {
|
||||
@@ -404,8 +451,15 @@ impl<'a, 'i> TimelineEventHandler<'a, 'i> {
|
||||
TimelineEventKind::Redaction { redacts, content } => {
|
||||
self.handle_redaction(redacts, content)
|
||||
}
|
||||
// TODO: State events
|
||||
_ => {}
|
||||
TimelineEventKind::State { .. } | TimelineEventKind::RedactedState => {
|
||||
// TODO
|
||||
}
|
||||
TimelineEventKind::FailedToParseMessageLike { event_type, error } => {
|
||||
self.add(NewEventTimelineItem::failed_to_parse_message_like(event_type, error));
|
||||
}
|
||||
TimelineEventKind::FailedToParseState { event_type, state_key, error } => {
|
||||
self.add(NewEventTimelineItem::failed_to_parse_state(event_type, state_key, error));
|
||||
}
|
||||
}
|
||||
|
||||
if !self.event_added {
|
||||
@@ -449,6 +503,14 @@ impl<'a, 'i> TimelineEventHandler<'a, 'i> {
|
||||
);
|
||||
return None;
|
||||
}
|
||||
TimelineItemContent::FailedToParseMessageLike { .. }
|
||||
| TimelineItemContent::FailedToParseState { .. } => {
|
||||
info!(
|
||||
%event_id,
|
||||
"Edit event applies to event that couldn't be parsed, discarding"
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let content = TimelineItemContent::Message(Message {
|
||||
@@ -705,6 +767,21 @@ impl NewEventTimelineItem {
|
||||
Self::from_content(TimelineItemContent::RedactedMessage)
|
||||
}
|
||||
|
||||
fn failed_to_parse_message_like(
|
||||
event_type: MessageLikeEventType,
|
||||
error: Arc<serde_json::Error>,
|
||||
) -> NewEventTimelineItem {
|
||||
Self::from_content(TimelineItemContent::FailedToParseMessageLike { event_type, error })
|
||||
}
|
||||
|
||||
fn failed_to_parse_state(
|
||||
event_type: StateEventType,
|
||||
state_key: String,
|
||||
error: Arc<serde_json::Error>,
|
||||
) -> NewEventTimelineItem {
|
||||
Self::from_content(TimelineItemContent::FailedToParseState { event_type, state_key, error })
|
||||
}
|
||||
|
||||
fn from_content(content: TimelineItemContent) -> Self {
|
||||
Self { content, reactions: BundledReactions::default() }
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::{fmt, sync::Arc};
|
||||
|
||||
use indexmap::IndexMap;
|
||||
use matrix_sdk_base::deserialized_responses::EncryptionInfo;
|
||||
@@ -23,7 +23,7 @@ use ruma::{
|
||||
encrypted::{EncryptedEventScheme, MegolmV1AesSha2Content, RoomEncryptedEventContent},
|
||||
message::MessageType,
|
||||
},
|
||||
AnySyncTimelineEvent,
|
||||
AnySyncTimelineEvent, MessageLikeEventType, StateEventType,
|
||||
},
|
||||
serde::Raw,
|
||||
uint, EventId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId,
|
||||
@@ -266,6 +266,27 @@ pub enum TimelineItemContent {
|
||||
|
||||
/// An `m.room.encrypted` event that could not be decrypted.
|
||||
UnableToDecrypt(EncryptedMessage),
|
||||
|
||||
/// A message-like event that failed to deserialize.
|
||||
FailedToParseMessageLike {
|
||||
/// The event `type`.
|
||||
event_type: MessageLikeEventType,
|
||||
|
||||
/// The deserialization error.
|
||||
error: Arc<serde_json::Error>,
|
||||
},
|
||||
|
||||
/// A state event that failed to deserialize.
|
||||
FailedToParseState {
|
||||
/// The event `type`.
|
||||
event_type: StateEventType,
|
||||
|
||||
/// The state key.
|
||||
state_key: String,
|
||||
|
||||
/// The deserialization error.
|
||||
error: Arc<serde_json::Error>,
|
||||
},
|
||||
}
|
||||
|
||||
impl TimelineItemContent {
|
||||
|
||||
@@ -27,7 +27,7 @@ use matrix_sdk_base::crypto::OlmMachine;
|
||||
use matrix_sdk_test::async_test;
|
||||
use once_cell::sync::Lazy;
|
||||
use ruma::{
|
||||
assign,
|
||||
assign, event_id,
|
||||
events::{
|
||||
reaction::{self, ReactionEventContent},
|
||||
room::{
|
||||
@@ -37,16 +37,18 @@ use ruma::{
|
||||
message::{self, MessageType, Replacement, RoomMessageEventContent},
|
||||
redaction::OriginalSyncRoomRedactionEvent,
|
||||
},
|
||||
MessageLikeEventContent, OriginalSyncMessageLikeEvent,
|
||||
MessageLikeEventContent, MessageLikeEventType, OriginalSyncMessageLikeEvent,
|
||||
StateEventType,
|
||||
},
|
||||
room_id,
|
||||
serde::Raw,
|
||||
server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, UserId,
|
||||
server_name, uint, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, UserId,
|
||||
};
|
||||
use serde_json::{json, Value as JsonValue};
|
||||
|
||||
use super::{
|
||||
EncryptedMessage, TimelineInner, TimelineItem, TimelineItemContent, VirtualTimelineItem,
|
||||
EncryptedMessage, TimelineInner, TimelineItem, TimelineItemContent, TimelineKey,
|
||||
VirtualTimelineItem,
|
||||
};
|
||||
|
||||
static ALICE: Lazy<&UserId> = Lazy::new(|| user_id!("@alice:server.name"));
|
||||
@@ -258,6 +260,90 @@ async fn update_read_marker() {
|
||||
assert_matches!(stream.next().await, Some(VecDiff::Move { old_index: 1, new_index: 2 }));
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn invalid_event_content() {
|
||||
let timeline = TestTimeline::new(&ALICE);
|
||||
let mut stream = timeline.stream();
|
||||
|
||||
// m.room.message events must have a msgtype and body in content, so this
|
||||
// event with an empty content object should fail to deserialize.
|
||||
timeline
|
||||
.handle_live_custom_event(json!({
|
||||
"content": {},
|
||||
"event_id": "$eeG0HA0FAZ37wP8kXlNkxx3I",
|
||||
"origin_server_ts": 10,
|
||||
"sender": "@alice:example.org",
|
||||
"type": "m.room.message",
|
||||
}))
|
||||
.await;
|
||||
|
||||
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
|
||||
let event_item = item.as_event().unwrap();
|
||||
assert_eq!(event_item.sender(), "@alice:example.org");
|
||||
assert_eq!(
|
||||
*event_item.key(),
|
||||
TimelineKey::EventId(event_id!("$eeG0HA0FAZ37wP8kXlNkxx3I").to_owned())
|
||||
);
|
||||
assert_eq!(event_item.origin_server_ts(), Some(MilliSecondsSinceUnixEpoch(uint!(10))));
|
||||
let event_type = assert_matches!(
|
||||
event_item.content(),
|
||||
TimelineItemContent::FailedToParseMessageLike { event_type, .. } => event_type
|
||||
);
|
||||
assert_eq!(*event_type, MessageLikeEventType::RoomMessage);
|
||||
|
||||
// Similar to above, the m.room.member state event must also not have an
|
||||
// empty content object.
|
||||
timeline
|
||||
.handle_live_custom_event(json!({
|
||||
"content": {},
|
||||
"event_id": "$d5G0HA0FAZ37wP8kXlNkxx3I",
|
||||
"origin_server_ts": 2179,
|
||||
"sender": "@alice:example.org",
|
||||
"type": "m.room.member",
|
||||
"state_key": "@alice:example.org",
|
||||
}))
|
||||
.await;
|
||||
|
||||
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
|
||||
let event_item = item.as_event().unwrap();
|
||||
assert_eq!(event_item.sender(), "@alice:example.org");
|
||||
assert_eq!(
|
||||
*event_item.key(),
|
||||
TimelineKey::EventId(event_id!("$d5G0HA0FAZ37wP8kXlNkxx3I").to_owned())
|
||||
);
|
||||
assert_eq!(event_item.origin_server_ts(), Some(MilliSecondsSinceUnixEpoch(uint!(2179))));
|
||||
let (event_type, state_key) = assert_matches!(
|
||||
event_item.content(),
|
||||
TimelineItemContent::FailedToParseState {
|
||||
event_type,
|
||||
state_key,
|
||||
..
|
||||
} => (event_type, state_key)
|
||||
);
|
||||
assert_eq!(*event_type, StateEventType::RoomMember);
|
||||
assert_eq!(*state_key, "@alice:example.org");
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn invalid_event() {
|
||||
let timeline = TestTimeline::new(&ALICE);
|
||||
|
||||
// This event is missing the sender field which the homeserver must add to
|
||||
// all timeline events. Because the event is malformed, it will be ignored.
|
||||
timeline
|
||||
.handle_live_custom_event(json!({
|
||||
"content": {
|
||||
"body": "hello world",
|
||||
"msgtype": "m.text"
|
||||
},
|
||||
"event_id": "$eeG0HA0FAZ37wP8kXlNkxx3I",
|
||||
"origin_server_ts": 10,
|
||||
"type": "m.room.message",
|
||||
}))
|
||||
.await;
|
||||
assert_eq!(timeline.inner.items.lock_ref().len(), 0);
|
||||
}
|
||||
|
||||
struct TestTimeline {
|
||||
own_user_id: OwnedUserId,
|
||||
inner: TimelineInner,
|
||||
|
||||
Reference in New Issue
Block a user