mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-18 13:40:55 -04:00
timeline: get rid of the synthetic Timeline and JoinedRoomUpdate when updating the timeline
This commit is contained in:
@@ -21,7 +21,6 @@ use matrix_sdk::{
|
||||
executor::spawn,
|
||||
Room,
|
||||
};
|
||||
use matrix_sdk_base::sync::JoinedRoomUpdate;
|
||||
use ruma::{
|
||||
events::{receipt::ReceiptType, AnySyncTimelineEvent},
|
||||
RoomVersionId,
|
||||
@@ -194,24 +193,10 @@ impl TimelineBuilder {
|
||||
} => {
|
||||
trace!("Received new events");
|
||||
|
||||
// XXX this timeline and the joined room updates are synthetic, until
|
||||
// we get rid of `handle_joined_room_update` by adding all functionality
|
||||
// back in the event cache, and replacing it with a simple
|
||||
// TODO: (bnjbvr) account_data and ephemeral should be handled by the
|
||||
// event cache, and we should replace this with a simple
|
||||
// `handle_add_events`.
|
||||
let timeline = matrix_sdk_base::sync::Timeline {
|
||||
limited: false,
|
||||
prev_batch: None,
|
||||
events,
|
||||
};
|
||||
let update = JoinedRoomUpdate {
|
||||
unread_notifications: Default::default(),
|
||||
timeline,
|
||||
state: Default::default(),
|
||||
account_data,
|
||||
ephemeral,
|
||||
ambiguity_changes: Default::default(),
|
||||
};
|
||||
inner.handle_joined_room_update(update).await;
|
||||
inner.handle_joined_room_update(events, account_data, ephemeral).await;
|
||||
|
||||
let member_ambiguity_changes = ambiguity_changes
|
||||
.values()
|
||||
|
||||
@@ -24,9 +24,7 @@ use imbl::Vector;
|
||||
use itertools::Itertools;
|
||||
#[cfg(all(test, feature = "e2e-encryption"))]
|
||||
use matrix_sdk::crypto::OlmMachine;
|
||||
use matrix_sdk::{
|
||||
deserialized_responses::SyncTimelineEvent, sync::JoinedRoomUpdate, Error, Result, Room,
|
||||
};
|
||||
use matrix_sdk::{deserialized_responses::SyncTimelineEvent, Error, Result, Room};
|
||||
#[cfg(test)]
|
||||
use ruma::events::receipt::ReceiptEventContent;
|
||||
#[cfg(all(test, feature = "e2e-encryption"))]
|
||||
@@ -43,9 +41,10 @@ use ruma::{
|
||||
message::{MessageType, Relation},
|
||||
redaction::RoomRedactionEventContent,
|
||||
},
|
||||
AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
|
||||
MessageLikeEventType,
|
||||
AnyMessageLikeEventContent, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent,
|
||||
AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
|
||||
},
|
||||
serde::Raw,
|
||||
EventId, OwnedEventId, OwnedTransactionId, RoomVersionId, TransactionId, UserId,
|
||||
};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
@@ -426,9 +425,22 @@ impl<P: RoomDataProvider> TimelineInner<P> {
|
||||
self.state.write().await.clear();
|
||||
}
|
||||
|
||||
pub(super) async fn handle_joined_room_update(&self, update: JoinedRoomUpdate) {
|
||||
pub(super) async fn handle_joined_room_update(
|
||||
&self,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
) {
|
||||
let mut state = self.state.write().await;
|
||||
state.handle_joined_room_update(update, &self.room_data_provider, &self.settings).await;
|
||||
state
|
||||
.handle_joined_room_update(
|
||||
events,
|
||||
account_data,
|
||||
ephemeral,
|
||||
&self.room_data_provider,
|
||||
&self.settings,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -22,8 +22,8 @@ use std::{
|
||||
|
||||
use eyeball_im::{ObservableVector, ObservableVectorTransaction, ObservableVectorTransactionEntry};
|
||||
use indexmap::IndexMap;
|
||||
use matrix_sdk::{deserialized_responses::SyncTimelineEvent, sync::Timeline};
|
||||
use matrix_sdk_base::{deserialized_responses::TimelineEvent, sync::JoinedRoomUpdate};
|
||||
use matrix_sdk::deserialized_responses::SyncTimelineEvent;
|
||||
use matrix_sdk_base::deserialized_responses::TimelineEvent;
|
||||
#[cfg(test)]
|
||||
use ruma::events::receipt::ReceiptEventContent;
|
||||
use ruma::{
|
||||
@@ -32,6 +32,7 @@ use ruma::{
|
||||
AnyMessageLikeEventContent, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent,
|
||||
},
|
||||
push::Action,
|
||||
serde::Raw,
|
||||
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
|
||||
RoomVersionId, UserId,
|
||||
};
|
||||
@@ -105,38 +106,35 @@ impl TimelineInnerState {
|
||||
return Default::default();
|
||||
}
|
||||
|
||||
let mut total = HandleManyEventsResult::default();
|
||||
|
||||
let position = match position {
|
||||
TimelineEnd::Front => TimelineItemPosition::Start,
|
||||
TimelineEnd::Back { from_cache } => TimelineItemPosition::End { from_cache },
|
||||
};
|
||||
|
||||
let mut txn = self.transaction();
|
||||
for event in events {
|
||||
let handle_one_res =
|
||||
txn.handle_remote_event(event.into(), position, room_data_provider, settings).await;
|
||||
|
||||
total.items_added += handle_one_res.item_added as u64;
|
||||
total.items_updated += handle_one_res.items_updated as u64;
|
||||
}
|
||||
let handle_many_res =
|
||||
txn.add_events_at(events, position, room_data_provider, settings).await;
|
||||
txn.commit();
|
||||
|
||||
total
|
||||
handle_many_res
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(super) async fn handle_joined_room_update<P: RoomDataProvider>(
|
||||
&mut self,
|
||||
update: JoinedRoomUpdate,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
room_data_provider: &P,
|
||||
settings: &TimelineInnerSettings,
|
||||
) {
|
||||
let mut txn = self.transaction();
|
||||
txn.handle_sync_timeline(update.timeline, room_data_provider, settings).await;
|
||||
|
||||
txn.add_events_at(
|
||||
events,
|
||||
TimelineEnd::Back { from_cache: false },
|
||||
room_data_provider,
|
||||
settings,
|
||||
)
|
||||
.await;
|
||||
|
||||
trace!("Handling account data");
|
||||
for raw_event in update.account_data {
|
||||
for raw_event in account_data {
|
||||
match raw_event.deserialize() {
|
||||
Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
|
||||
txn.set_fully_read_event(ev.content.event_id);
|
||||
@@ -149,10 +147,10 @@ impl TimelineInnerState {
|
||||
}
|
||||
}
|
||||
|
||||
if !update.ephemeral.is_empty() {
|
||||
if !ephemeral.is_empty() {
|
||||
trace!("Handling ephemeral room events");
|
||||
let own_user_id = room_data_provider.own_user_id();
|
||||
for raw_event in update.ephemeral {
|
||||
for raw_event in ephemeral {
|
||||
match raw_event.deserialize() {
|
||||
Ok(AnySyncEphemeralRoomEvent::Receipt(ev)) => {
|
||||
txn.handle_explicit_read_receipts(ev.content, own_user_id);
|
||||
@@ -439,28 +437,32 @@ pub(in crate::timeline) struct TimelineInnerStateTransaction<'a> {
|
||||
}
|
||||
|
||||
impl TimelineInnerStateTransaction<'_> {
|
||||
#[instrument(skip_all, fields(limited = timeline.limited))]
|
||||
async fn handle_sync_timeline<P: RoomDataProvider>(
|
||||
/// Add the given events at the given end of the timeline.
|
||||
#[tracing::instrument(skip(self, events, room_data_provider, settings))]
|
||||
pub(super) async fn add_events_at<P: RoomDataProvider>(
|
||||
&mut self,
|
||||
timeline: Timeline,
|
||||
events: Vec<impl Into<SyncTimelineEvent>>,
|
||||
position: TimelineEnd,
|
||||
room_data_provider: &P,
|
||||
settings: &TimelineInnerSettings,
|
||||
) {
|
||||
if timeline.limited {
|
||||
self.clear();
|
||||
) -> HandleManyEventsResult {
|
||||
let mut total = HandleManyEventsResult::default();
|
||||
|
||||
let position = match position {
|
||||
TimelineEnd::Front => TimelineItemPosition::Start,
|
||||
TimelineEnd::Back { from_cache } => TimelineItemPosition::End { from_cache },
|
||||
};
|
||||
|
||||
for event in events {
|
||||
let handle_one_res = self
|
||||
.handle_remote_event(event.into(), position, room_data_provider, settings)
|
||||
.await;
|
||||
|
||||
total.items_added += handle_one_res.item_added as u64;
|
||||
total.items_updated += handle_one_res.items_updated as u64;
|
||||
}
|
||||
|
||||
let num_events = timeline.events.len();
|
||||
for (i, event) in timeline.events.into_iter().enumerate() {
|
||||
trace!("Handling event {} out of {num_events}", i + 1);
|
||||
self.handle_remote_event(
|
||||
event,
|
||||
TimelineItemPosition::End { from_cache: false },
|
||||
room_data_provider,
|
||||
settings,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
total
|
||||
}
|
||||
|
||||
/// Handle a remote event.
|
||||
|
||||
@@ -53,7 +53,7 @@ impl super::Timeline {
|
||||
let num_events = events.len();
|
||||
trace!("Back-pagination succeeded with {num_events} events");
|
||||
|
||||
let handle_many_events_result =
|
||||
let handle_many_res =
|
||||
self.inner.add_events_at(events, TimelineEnd::Front).await;
|
||||
|
||||
if reached_start {
|
||||
@@ -68,8 +68,8 @@ impl super::Timeline {
|
||||
.total_events_received
|
||||
.checked_add(outcome.events_received)?;
|
||||
|
||||
outcome.items_added = handle_many_events_result.items_added;
|
||||
outcome.items_updated = handle_many_events_result.items_updated;
|
||||
outcome.items_added = handle_many_res.items_added;
|
||||
outcome.items_updated = handle_many_res.items_updated;
|
||||
outcome.total_items_added += outcome.items_added;
|
||||
outcome.total_items_updated += outcome.items_updated;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user