mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-13 02:25:51 -04:00
refactor(sdk): Add RoomEventCacheUpdate::UpdateTimelineEvents.
This patch adds a new variant to `RoomEventCacheUpdate`, namely `UpdateTimelineEvents. It's going to replace `AddTimelineEvents` soon once it's stable enough. This is a transition. They are read by the `Timeline` if and only if `TimelineSettings::vectordiffs_as_inputs` is turned on.
This commit is contained in:
@@ -34,6 +34,7 @@ use std::{
|
||||
};
|
||||
|
||||
use eyeball::Subscriber;
|
||||
use eyeball_im::VectorDiff;
|
||||
use matrix_sdk_base::{
|
||||
deserialized_responses::{AmbiguityChange, SyncTimelineEvent, TimelineEvent},
|
||||
event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
|
||||
@@ -543,6 +544,7 @@ pub enum RoomEventCacheUpdate {
|
||||
},
|
||||
|
||||
/// The room has received new timeline events.
|
||||
// TODO: remove once `UpdateTimelineEvents` is stabilized
|
||||
AddTimelineEvents {
|
||||
/// All the new events that have been added to the room's timeline.
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
@@ -551,6 +553,15 @@ pub enum RoomEventCacheUpdate {
|
||||
origin: EventsOrigin,
|
||||
},
|
||||
|
||||
/// The room has received updates for the timeline as _diffs_.
|
||||
UpdateTimelineEvents {
|
||||
/// Diffs to apply to the timeline.
|
||||
diffs: Vec<VectorDiff<SyncTimelineEvent>>,
|
||||
|
||||
/// Where the diffs are coming from.
|
||||
origin: EventsOrigin,
|
||||
},
|
||||
|
||||
/// The room has received new ephemeral events.
|
||||
AddEphemeralEvents {
|
||||
/// XXX: this is temporary, until read receipts are handled in the event
|
||||
|
||||
@@ -533,8 +533,8 @@ impl RoomEventCacheInner {
|
||||
|
||||
// Add the previous back-pagination token (if present), followed by the timeline
|
||||
// events themselves.
|
||||
{
|
||||
state
|
||||
let sync_timeline_events_diffs = {
|
||||
let sync_timeline_events_diffs = state
|
||||
.with_events_mut(|room_events| {
|
||||
if let Some(prev_token) = &prev_batch {
|
||||
room_events.push_gap(Gap { prev_token: prev_token.clone() });
|
||||
@@ -556,6 +556,8 @@ impl RoomEventCacheInner {
|
||||
.replace_gap_at([], prev_gap_id)
|
||||
.expect("we obtained the valid position beforehand");
|
||||
}
|
||||
|
||||
room_events.updates_as_vector_diffs()
|
||||
})
|
||||
.await?;
|
||||
|
||||
@@ -566,7 +568,9 @@ impl RoomEventCacheInner {
|
||||
cache.events.insert(event_id.to_owned(), (self.room_id.clone(), ev.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sync_timeline_events_diffs
|
||||
};
|
||||
|
||||
// Now that all events have been added, we can trigger the
|
||||
// `pagination_token_notifier`.
|
||||
@@ -576,6 +580,7 @@ impl RoomEventCacheInner {
|
||||
|
||||
// The order of `RoomEventCacheUpdate`s is **really** important here.
|
||||
{
|
||||
// TODO: remove once `UpdateTimelineEvents` is stabilized.
|
||||
if !sync_timeline_events.is_empty() {
|
||||
let _ = self.sender.send(RoomEventCacheUpdate::AddTimelineEvents {
|
||||
events: sync_timeline_events,
|
||||
@@ -583,6 +588,13 @@ impl RoomEventCacheInner {
|
||||
});
|
||||
}
|
||||
|
||||
if !sync_timeline_events_diffs.is_empty() {
|
||||
let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
|
||||
diffs: sync_timeline_events_diffs,
|
||||
origin: EventsOrigin::Sync,
|
||||
});
|
||||
}
|
||||
|
||||
if !ephemeral_events.is_empty() {
|
||||
let _ = self
|
||||
.sender
|
||||
|
||||
@@ -84,6 +84,8 @@ async fn test_event_cache_receives_events() {
|
||||
assert_let_timeout!(
|
||||
Ok(RoomEventCacheUpdate::AddTimelineEvents { events, .. }) = subscriber.recv()
|
||||
);
|
||||
// It does also receive the update as `VectorDiff`.
|
||||
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv());
|
||||
|
||||
// Which contains the event that was sent beforehand.
|
||||
assert_eq!(events.len(), 1);
|
||||
@@ -170,6 +172,8 @@ async fn test_ignored_unignored() {
|
||||
assert_let_timeout!(
|
||||
Ok(RoomEventCacheUpdate::AddTimelineEvents { events, .. }) = subscriber.recv()
|
||||
);
|
||||
// It does also receive the update as `VectorDiff`.
|
||||
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv());
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_event_matches_msg(&events[0], "i don't like this dexter");
|
||||
|
||||
@@ -197,6 +201,10 @@ async fn wait_for_initial_events(
|
||||
update = room_stream.recv().await.expect("read error");
|
||||
}
|
||||
assert_matches!(update, RoomEventCacheUpdate::AddTimelineEvents { .. });
|
||||
|
||||
let update = room_stream.recv().await.expect("read error");
|
||||
|
||||
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { .. });
|
||||
} else {
|
||||
assert_eq!(events.len(), 1);
|
||||
}
|
||||
@@ -810,6 +818,10 @@ async fn test_limited_timeline_with_storage() {
|
||||
assert_let_timeout!(
|
||||
Ok(RoomEventCacheUpdate::AddTimelineEvents { events, .. }) = subscriber.recv()
|
||||
);
|
||||
// It does also receive the update as `VectorDiff`.
|
||||
assert_let_timeout!(
|
||||
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv()
|
||||
);
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_event_matches_msg(&events[0], "hey yo");
|
||||
} else {
|
||||
@@ -832,6 +844,8 @@ async fn test_limited_timeline_with_storage() {
|
||||
assert_let_timeout!(
|
||||
Ok(RoomEventCacheUpdate::AddTimelineEvents { events, .. }) = subscriber.recv()
|
||||
);
|
||||
// It does also receive the update as `VectorDiff`.
|
||||
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv());
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_event_matches_msg(&events[0], "gappy!");
|
||||
|
||||
@@ -1058,6 +1072,9 @@ async fn test_no_gap_stored_after_deduplicated_sync() {
|
||||
if events.is_empty() {
|
||||
let update = stream.recv().await.expect("read error");
|
||||
assert_matches!(update, RoomEventCacheUpdate::AddTimelineEvents { .. });
|
||||
// It does also receive the update as `VectorDiff`.
|
||||
let update = stream.recv().await.expect("read error");
|
||||
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { .. });
|
||||
}
|
||||
drop(events);
|
||||
|
||||
@@ -1099,6 +1116,8 @@ async fn test_no_gap_stored_after_deduplicated_sync() {
|
||||
assert_event_matches_msg(&events[1], "world");
|
||||
assert_event_matches_msg(&events[2], "sup");
|
||||
assert_eq!(events.len(), 3);
|
||||
|
||||
assert!(stream.is_empty());
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
@@ -1133,6 +1152,9 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() {
|
||||
if events.is_empty() {
|
||||
let update = stream.recv().await.expect("read error");
|
||||
assert_matches!(update, RoomEventCacheUpdate::AddTimelineEvents { .. });
|
||||
// It does also receive the update as `VectorDiff`.
|
||||
let update = stream.recv().await.expect("read error");
|
||||
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { .. });
|
||||
}
|
||||
drop(events);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user