test(sdk): Update tests to the new API.

This commit is contained in:
Ivan Enderlin
2026-05-01 14:22:16 +02:00
parent dad3a7fb7a
commit ef76aafdda
21 changed files with 457 additions and 454 deletions

View File

@@ -38,8 +38,7 @@ use matrix_sdk::{
#[cfg(test)]
use ruma::events::receipt::ReceiptEventContent;
use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomId, TransactionId,
UserId,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
events::{
AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
@@ -69,9 +68,9 @@ pub(super) use self::{
};
use super::{
DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
TimelineReadReceiptTracking, VirtualTimelineItem,
MediaUploadProgress, Profile, TimelineDetails, TimelineEventItemId, TimelineFocus,
TimelineItem, TimelineItemContent, TimelineItemKind, TimelineReadReceiptTracking,
VirtualTimelineItem,
algorithms::{rfind_event_by_id, rfind_event_item},
event_item::{ReactionStatus, RemoteEventOrigin},
item::TimelineUniqueId,

View File

@@ -50,7 +50,7 @@ use crate::timeline::{
#[async_test]
async fn test_initial_events() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -102,7 +102,8 @@ async fn test_replace_with_initial_events_and_read_marker() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
let ev = f.text_msg("hey").sender(*ALICE).into_event();
@@ -131,7 +132,7 @@ async fn test_replace_with_initial_events_and_read_marker() {
#[async_test]
async fn test_sticker() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
timeline
@@ -160,7 +161,7 @@ async fn test_sticker() {
#[async_test]
async fn test_room_member() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
// Bob invites Alice.
@@ -259,7 +260,7 @@ async fn test_room_member() {
#[async_test]
async fn test_other_state() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -287,7 +288,8 @@ async fn test_other_state() {
#[async_test]
async fn test_internal_id_prefix() {
let timeline = TestTimelineBuilder::new().internal_id_prefix("le_prefix_".to_owned()).build();
let timeline =
TestTimelineBuilder::new().internal_id_prefix("le_prefix_".to_owned()).build().await;
let f = &timeline.factory;
let ev_a = f.text_msg("A").sender(*ALICE).into_event();
@@ -323,7 +325,7 @@ async fn test_internal_id_prefix() {
#[async_test]
async fn test_internal_id_reuse() {
let timeline = TestTimelineBuilder::new().build();
let timeline = TestTimelineBuilder::new().build().await;
let f = &timeline.factory;
let ev_a = f.text_msg("A").sender(*ALICE).into_event();
@@ -393,7 +395,7 @@ async fn test_internal_id_reuse() {
#[async_test]
async fn test_sanitized() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -437,7 +439,7 @@ async fn test_sanitized() {
#[async_test]
async fn test_reply() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -497,7 +499,7 @@ async fn test_reply() {
#[async_test]
async fn test_thread() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -542,7 +544,8 @@ async fn test_replace_with_initial_events_when_batched() {
let timeline = TestTimelineBuilder::new()
.provider(TestRoomDataProvider::default())
.settings(TimelineSettings::default())
.build();
.build()
.await;
let f = &timeline.factory;
let ev = f.text_msg("hey").sender(*ALICE).into_event();

View File

@@ -36,7 +36,7 @@ use crate::timeline::{
#[async_test]
async fn test_remote_echo_full_trip() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
// Given a local event…
@@ -140,7 +140,7 @@ async fn test_remote_echo_full_trip() {
#[async_test]
async fn test_remote_echo_new_position() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -192,7 +192,7 @@ async fn test_remote_echo_new_position() {
#[async_test]
async fn test_date_divider_removed_after_local_echo_disappeared() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let f = &timeline.factory;
@@ -244,7 +244,8 @@ async fn test_no_read_marker_with_local_echo() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
@@ -297,7 +298,7 @@ async fn test_no_read_marker_with_local_echo() {
#[async_test]
async fn test_no_reuse_of_counters() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let now = MilliSecondsSinceUnixEpoch::now();

View File

@@ -32,7 +32,7 @@ use super::TestTimeline;
#[async_test]
async fn test_live_redacted() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -58,7 +58,7 @@ async fn test_live_redacted() {
#[async_test]
async fn test_live_sanitized() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -103,7 +103,7 @@ async fn test_live_sanitized() {
#[async_test]
async fn test_aggregated_sanitized() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let original_event_id = event_id!("$original");
@@ -147,7 +147,7 @@ async fn test_aggregated_sanitized() {
#[async_test]
async fn test_edit_updates_encryption_info() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let event_factory = &timeline.factory;
let room_id = room_id!("!room:id");
@@ -229,7 +229,7 @@ async fn test_edit_updates_encryption_info() {
#[async_test]
async fn test_relations_edit_overrides_pending_edit_msg() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -284,7 +284,7 @@ async fn test_relations_edit_overrides_pending_edit_msg() {
#[async_test]
async fn test_relations_edit_overrides_pending_edit_poll() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -355,7 +355,7 @@ async fn test_relations_edit_overrides_pending_edit_poll() {
#[async_test]
async fn test_updated_reply_doesnt_lose_latest_edit() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let f = &timeline.factory;

View File

@@ -675,7 +675,7 @@ async fn test_retry_message_decryption_highlighted() {
#[async_test]
async fn test_utd_cause_for_nonmember_event_is_found() {
// Given a timeline
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
// When we add an event with "membership: leave"
@@ -696,7 +696,7 @@ async fn test_utd_cause_for_nonmember_event_is_found() {
#[async_test]
async fn test_utd_cause_for_nonmember_event_is_found_unstable_prefix() {
// Given a timeline
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
// When we add an event with "io.element.msc4115.membership: leave"
@@ -721,7 +721,7 @@ async fn test_utd_cause_for_nonmember_event_is_found_unstable_prefix() {
#[async_test]
async fn test_utd_cause_for_member_event_is_unknown() {
// Given a timeline
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
// When we add an event with "membership: join"
@@ -742,7 +742,7 @@ async fn test_utd_cause_for_member_event_is_unknown() {
#[async_test]
async fn test_utd_cause_for_missing_membership_is_unknown() {
// Given a timeline
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
// When we add an event with no membership in unsigned

View File

@@ -40,7 +40,7 @@ use crate::timeline::{
#[async_test]
async fn test_default_filter() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -101,7 +101,8 @@ async fn test_default_filter() {
async fn test_filter_always_false() {
let timeline = TestTimelineBuilder::new()
.settings(TimelineSettings { event_filter: Arc::new(|_, _| false), ..Default::default() })
.build();
.build()
.await;
let f = &timeline.factory;
timeline.handle_live_event(f.text_msg("The first message").sender(&ALICE)).await;
@@ -123,7 +124,8 @@ async fn test_custom_filter() {
event_filter: Arc::new(|ev, _| matches!(ev, AnySyncTimelineEvent::MessageLike(_))),
..Default::default()
})
.build();
.build()
.await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -149,7 +151,8 @@ async fn test_custom_filter_for_custom_msglike_event() {
event_filter: Arc::new(|ev, _| matches!(ev, AnySyncTimelineEvent::MessageLike(_))),
..Default::default()
})
.build();
.build()
.await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -170,7 +173,8 @@ async fn test_custom_filter_for_custom_msglike_event() {
async fn test_hide_failed_to_parse() {
let timeline = TestTimelineBuilder::new()
.settings(TimelineSettings { add_failed_to_parse: false, ..Default::default() })
.build();
.build()
.await;
// m.room.message events must have a msgtype and body in content, so this
// event with an empty content object should fail to deserialize.
@@ -212,7 +216,8 @@ async fn test_event_filter_include_only_room_names() {
event_filter: Arc::new(move |event, _| event_filter.filter(event)),
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
// Add a non-encrypted message event
@@ -246,7 +251,8 @@ async fn test_event_filter_exclude_messages() {
event_filter: Arc::new(move |event, _| event_filter.filter(event)),
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
// Add a message event
@@ -278,7 +284,8 @@ async fn test_event_filter_include_only_membership_changes() {
event_filter: Arc::new(move |event, _| event_filter.filter(event)),
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
// Add Alice's join event
@@ -330,7 +337,8 @@ async fn test_event_filter_include_only_profile_changes() {
event_filter: Arc::new(move |event, _| event_filter.filter(event)),
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
// Add Alice's join event
@@ -385,7 +393,8 @@ async fn test_event_filter_include_only_messages_and_membership_changes() {
event_filter: Arc::new(move |event, _| event_filter.filter(event)),
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
// Add Alice's join event
@@ -437,7 +446,8 @@ async fn test_event_filter_exclude_membership_changes() {
event_filter: Arc::new(move |event, _| event_filter.filter(event)),
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
// Add Alice's join event
@@ -489,7 +499,8 @@ async fn test_event_filter_exclude_profile_changes() {
event_filter: Arc::new(move |event, _| event_filter.filter(event)),
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
// Add Alice's join event
@@ -545,7 +556,8 @@ async fn test_event_filter_exclude_messages_and_membership_changes() {
event_filter: Arc::new(move |event, _| event_filter.filter(event)),
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
// Add Alice's join event

View File

@@ -28,7 +28,7 @@ use crate::timeline::TimelineItemContent;
#[async_test]
async fn test_invalid_edit() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let f = &timeline.factory;
@@ -55,7 +55,7 @@ async fn test_invalid_edit() {
#[async_test]
async fn test_invalid_event_content() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
// m.room.message events must have a msgtype and body in content, so this
@@ -103,7 +103,7 @@ async fn test_invalid_event_content() {
#[async_test]
async fn test_invalid_event() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
// This event is missing the sender field which the homeserver must add to
// all timeline events. Because the event is malformed, it will be ignored.

View File

@@ -35,7 +35,7 @@ use crate::timeline::{
/// item with `is_live() == true` and no accumulated locations.
#[async_test]
async fn test_beacon_info_creates_timeline_item() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");
@@ -64,7 +64,7 @@ async fn test_beacon_info_creates_timeline_item() {
/// item (we check `live`, not `is_live()` which would return `false`).
#[async_test]
async fn test_beacon_info_with_expired_timeout_still_creates_item() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");
@@ -91,7 +91,7 @@ async fn test_beacon_info_with_expired_timeout_still_creates_item() {
/// `beacon_info` produces no timeline item (there is nothing to stop).
#[async_test]
async fn test_beacon_info_stopped_without_start_produces_no_item() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_stop:example.org");
@@ -109,7 +109,7 @@ async fn test_beacon_info_stopped_without_start_produces_no_item() {
/// timeline item.
#[async_test]
async fn test_beacon_update_aggregates_onto_beacon_info() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");
@@ -142,7 +142,7 @@ async fn test_beacon_update_aggregates_onto_beacon_info() {
/// Multiple location updates accumulate in timestamp order.
#[async_test]
async fn test_multiple_beacon_updates_accumulate_in_order() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");
@@ -181,7 +181,7 @@ async fn test_multiple_beacon_updates_accumulate_in_order() {
/// state event, the aggregation is stashed and applied once the parent appears.
#[async_test]
async fn test_beacon_update_before_beacon_info_is_applied_when_parent_arrives() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id: OwnedEventId = owned_event_id!("$beacon:example.org");
@@ -216,7 +216,7 @@ async fn test_beacon_update_before_beacon_info_is_applied_when_parent_arrives()
/// `Beacon` timeline items.
#[async_test]
async fn test_multiple_users_sharing_produce_independent_items() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let alice_beacon_id = event_id!("$alice_beacon:example.org");
let bob_beacon_id = event_id!("$bob_beacon:example.org");
@@ -292,7 +292,7 @@ async fn test_multiple_users_sharing_produce_independent_items() {
/// it is silently aggregated (or stashed).
#[async_test]
async fn test_beacon_update_not_shown_standalone() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$some_beacon:example.org");
@@ -311,7 +311,7 @@ async fn test_beacon_update_not_shown_standalone() {
/// in-place rather than creating a new timeline item.
#[async_test]
async fn test_beacon_stop_updates_existing_item() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let start_id = event_id!("$beacon_start:example.org");
let stop_id = event_id!("$beacon_stop:example.org");
@@ -349,7 +349,7 @@ async fn test_beacon_stop_updates_existing_item() {
/// existing item.
#[async_test]
async fn test_beacon_stop_preserves_locations() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let start_id = event_id!("$beacon_start:example.org");
let stop_id = event_id!("$beacon_stop:example.org");
@@ -387,7 +387,7 @@ async fn test_beacon_stop_preserves_locations() {
/// item should be non-live from the moment it first appears.
#[async_test]
async fn test_beacon_stop_before_start_is_applied_later() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let start_id = event_id!("$beacon_start:example.org");
let stop_id = event_id!("$beacon_stop:example.org");
@@ -430,7 +430,7 @@ async fn test_beacon_stop_before_start_is_applied_later() {
/// 3. User starts session B — the stashed stop should NOT apply
#[async_test]
async fn test_pending_beacon_stop_not_applied_to_different_session() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let old_stop_id = event_id!("$old_stop:example.org");
let new_start_id = event_id!("$new_start:example.org");
@@ -486,7 +486,7 @@ async fn test_pending_beacon_stop_not_applied_to_different_session() {
/// Duplicate beacon location updates (same timestamp) are de-duplicated.
#[async_test]
async fn test_duplicate_beacon_location_is_deduplicated() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");
@@ -517,7 +517,7 @@ async fn test_duplicate_beacon_location_is_deduplicated() {
/// A redacted `beacon_info` event produces a redacted item (not a Beacon item).
#[async_test]
async fn test_redacted_beacon_info_produces_redacted_item() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
timeline
@@ -541,7 +541,7 @@ async fn test_redacted_beacon_info_produces_redacted_item() {
/// via `reactions()`.
#[async_test]
async fn test_reaction_on_live_location_item() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");
@@ -570,7 +570,7 @@ async fn test_reaction_on_live_location_item() {
/// location item.
#[async_test]
async fn test_multiple_reactions_on_live_location_item() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");
@@ -599,7 +599,7 @@ async fn test_multiple_reactions_on_live_location_item() {
/// and applied once the beacon_info item is inserted.
#[async_test]
async fn test_reaction_before_live_location_item_is_applied_when_parent_arrives() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");
@@ -627,7 +627,7 @@ async fn test_reaction_before_live_location_item_is_applied_when_parent_arrives(
/// and is then confirmed by the remote echo from sync.
#[async_test]
async fn test_local_reaction_on_live_location_item() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");

View File

@@ -27,22 +27,25 @@ use futures_core::Stream;
use imbl::vector;
use indexmap::IndexMap;
use matrix_sdk::{
Client,
deserialized_responses::TimelineEvent,
paginators::{PaginableRoom, PaginatorError, thread::PaginableThread},
room::{EventWithContextResponse, Messages, MessagesOptions, Relations},
send_queue::RoomSendQueueUpdate,
test_utils::mocks::MatrixMockServer,
};
use matrix_sdk_base::{RoomInfo, RoomState, crypto::types::events::CryptoContextInfo};
use matrix_sdk_test::{ALICE, DEFAULT_TEST_ROOM_ID, event_factory::EventFactory};
use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
TransactionId, UInt, UserId,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId,
OwnedUserId, RoomId, TransactionId, UInt, UserId,
events::{
AnyMessageLikeEventContent, AnyTimelineEvent,
reaction::ReactionEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
relation::Annotation,
},
room_id,
room_version_rules::RoomVersionRules,
serde::Raw,
};
@@ -110,20 +113,35 @@ impl TestTimelineBuilder {
self
}
fn build(self) -> TestTimeline {
async fn build(self) -> TestTimeline {
let room_data_provider = self.provider.unwrap_or_default();
// Create the room for the event cache.
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let _room = server.sync_joined_room(&client, room_data_provider.room_id()).await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let controller = TimelineController::new(
self.provider.unwrap_or_default(),
self.focus.unwrap_or(TimelineFocus::Live { hide_threaded_events: false }),
room_data_provider,
&self.focus.unwrap_or(TimelineFocus::Live { hide_threaded_events: false }),
event_cache,
self.internal_id_prefix,
self.utd_hook,
self.is_room_encrypted,
self.settings.unwrap_or_default(),
);
TestTimeline { controller, factory: EventFactory::new() }
)
.await
.unwrap();
TestTimeline { _client: client, controller, factory: EventFactory::new() }
}
}
struct TestTimeline {
_client: Client,
controller: TimelineController<TestRoomDataProvider>,
/// An [`EventFactory`] that can be used for creating events in this
@@ -132,8 +150,8 @@ struct TestTimeline {
}
impl TestTimeline {
fn new() -> Self {
TestTimelineBuilder::new().build()
async fn new() -> Self {
TestTimelineBuilder::new().build().await
}
/// Returns the associated inner data from that [`TestTimeline`].
@@ -239,6 +257,9 @@ type ReadReceiptMap =
#[derive(Clone, Debug, Default)]
struct TestRoomDataProvider {
/// The room ID.
room_id: Option<OwnedRoomId>,
/// The ID of our own user.
own_user_id: Option<OwnedUserId>,
@@ -304,6 +325,10 @@ impl PaginableThread for TestRoomDataProvider {
}
impl RoomDataProvider for TestRoomDataProvider {
fn room_id(&self) -> &RoomId {
self.room_id.as_deref().unwrap_or(room_id!("!r0"))
}
fn own_user_id(&self) -> &UserId {
self.own_user_id.as_deref().unwrap_or(&ALICE)
}

View File

@@ -14,7 +14,7 @@ use crate::timeline::{EventTimelineItem, event_item::PollState, tests::TestTimel
#[async_test]
async fn test_poll_is_displayed() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
let poll_state = timeline.poll_state().await;
@@ -25,7 +25,7 @@ async fn test_poll_is_displayed() {
#[async_test]
async fn test_edited_poll_is_displayed() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
let event = timeline.poll_event().await;
@@ -42,7 +42,7 @@ async fn test_edited_poll_is_displayed() {
#[async_test]
async fn test_voting_adds_the_vote_to_the_results() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
let poll_id = timeline.poll_event().await.event_id().unwrap().to_owned();
@@ -56,7 +56,7 @@ async fn test_voting_adds_the_vote_to_the_results() {
#[async_test]
async fn test_ending_a_poll_sets_end_time_to_results() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
let poll_id = timeline.poll_event().await.event_id().unwrap().to_owned();
@@ -69,7 +69,7 @@ async fn test_ending_a_poll_sets_end_time_to_results() {
#[async_test]
async fn test_only_the_last_vote_from_a_user_is_counted() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
let poll_id = timeline.poll_event().await.event_id().unwrap().to_owned();
@@ -86,7 +86,7 @@ async fn test_only_the_last_vote_from_a_user_is_counted() {
#[async_test]
async fn test_votes_after_end_are_discarded() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
let poll_id = timeline.poll_event().await.event_id().unwrap().to_owned();
@@ -103,7 +103,7 @@ async fn test_votes_after_end_are_discarded() {
#[async_test]
async fn test_multiple_end_events_are_discarded() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
let poll_id = timeline.poll_event().await.event_id().unwrap().to_owned();
@@ -123,7 +123,7 @@ async fn test_multiple_end_events_are_discarded() {
#[async_test]
async fn test_a_somewhat_complex_voting_session_yields_the_expected_outcome() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
let poll_id = timeline.poll_event().await.event_id().unwrap().to_owned();
@@ -155,7 +155,7 @@ async fn test_a_somewhat_complex_voting_session_yields_the_expected_outcome() {
#[async_test]
async fn test_events_received_before_start_are_not_lost() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let poll_id: OwnedEventId = EventId::new_v1(server_name!("dummy.server"));
// Alice votes
@@ -184,7 +184,7 @@ async fn test_events_received_before_start_are_not_lost() {
#[async_test]
async fn test_adding_response_doesnt_clear_latest_json_edit() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
// Alice sends the poll.
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
@@ -205,7 +205,7 @@ async fn test_adding_response_doesnt_clear_latest_json_edit() {
#[async_test]
async fn test_ending_poll_doesnt_clear_latest_json_edit() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
// Alice sends the poll.
timeline.send_poll_start(&ALICE, fakes::poll_a()).await;
@@ -227,7 +227,7 @@ async fn test_ending_poll_doesnt_clear_latest_json_edit() {
#[async_test]
async fn test_poll_contains_relations() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let thread_root_id = event_id!("$thread_root");
let in_reply_to_id = event_id!("$in_reply_to");

View File

@@ -82,7 +82,7 @@ macro_rules! assert_reaction_is_updated {
#[async_test]
async fn test_add_reaction_on_non_existent_event() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let event_id = EventId::parse("$nonexisting_unique_id").unwrap();
@@ -96,7 +96,7 @@ async fn test_add_reaction_on_non_existent_event() {
#[async_test]
async fn test_add_reaction_success() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let (item_id, event_id, item_pos) = send_first_message(&timeline, &mut stream).await;
@@ -126,7 +126,7 @@ async fn test_add_reaction_success() {
#[async_test]
async fn test_redact_reaction_success() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let f = &timeline.factory;
let mut stream = timeline.subscribe().await;
@@ -172,7 +172,7 @@ async fn test_redact_reaction_success() {
#[async_test]
async fn test_reactions_store_timestamp() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let (item_id, event_id, msg_pos) = send_first_message(&timeline, &mut stream).await;
@@ -192,7 +192,7 @@ async fn test_reactions_store_timestamp() {
#[async_test]
async fn test_initial_reaction_timestamp_is_stored() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let f = &timeline.factory;
let message_event_id = EventId::new_v1(server_name!("dummy.server"));
@@ -254,7 +254,7 @@ async fn send_first_message(
async fn test_reinserted_item_keeps_reactions() {
// This test checks that after deduplicating events, the reactions attached to
// the deduplicated event are not lost.
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let f = &timeline.factory;
// We receive an initial update with one event and a reaction to this event.

View File

@@ -56,7 +56,8 @@ async fn test_read_receipts_updates_on_live_events() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -121,7 +122,8 @@ async fn test_read_receipts_updates_on_back_paginated_events() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
let room_id = room_id!("!room:localhost");
let f = EventFactory::new().room(room_id);
@@ -163,7 +165,8 @@ async fn test_read_receipts_updates_on_filtered_events() {
event_filter: Arc::new(filter_notice),
..Default::default()
})
.build();
.build()
.await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -264,7 +267,8 @@ async fn test_read_receipts_updates_on_filtered_events_with_stored() {
event_filter: Arc::new(filter_notice),
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
let mut stream = timeline.subscribe().await;
@@ -334,7 +338,8 @@ async fn test_read_receipts_updates_on_back_paginated_filtered_events() {
event_filter: Arc::new(filter_notice),
..Default::default()
})
.build();
.build()
.await;
let mut stream = timeline.subscribe().await;
let room_id = room_id!("!room:localhost");
@@ -547,7 +552,8 @@ async fn test_initial_public_unthreaded_receipt() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
let (receipt_event_id, _) = timeline.controller.latest_user_read_receipt(*ALICE).await.unwrap();
assert_eq!(receipt_event_id, event_id);
@@ -575,7 +581,8 @@ async fn test_initial_public_main_thread_receipt() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
let (receipt_event_id, _) = timeline.controller.latest_user_read_receipt(*ALICE).await.unwrap();
assert_eq!(receipt_event_id, event_id);
@@ -603,7 +610,8 @@ async fn test_initial_private_unthreaded_receipt() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
let (receipt_event_id, _) = timeline.controller.latest_user_read_receipt(*ALICE).await.unwrap();
assert_eq!(receipt_event_id, event_id);
@@ -631,7 +639,8 @@ async fn test_initial_private_main_thread_receipt() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
let (receipt_event_id, _) = timeline.controller.latest_user_read_receipt(*ALICE).await.unwrap();
assert_eq!(receipt_event_id, event_id);
@@ -648,7 +657,8 @@ async fn test_clear_read_receipts() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
let f = &timeline.factory;
let event_a_content = RoomMessageEventContent::text_plain("A");
@@ -731,7 +741,8 @@ async fn test_implicit_read_receipt_before_explicit_read_receipt() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
// Check that the receipts are at the correct place.
let (receipt_event_id, _) = timeline.controller.latest_user_read_receipt(*ALICE).await.unwrap();
@@ -796,7 +807,8 @@ async fn test_threaded_latest_user_read_receipt() {
track_read_receipts: TimelineReadReceiptTracking::AllEvents,
..Default::default()
})
.build();
.build()
.await;
// Sanity check: no read receipts before any events.
assert!(timeline.controller.latest_user_read_receipt(*ALICE).await.is_none());
@@ -871,7 +883,8 @@ async fn test_unthreaded_client_updates_threaded_read_receipts() {
..Default::default()
})
.focus(TimelineFocus::Live { hide_threaded_events: true })
.build();
.build()
.await;
let mut stream = timeline.subscribe().await;
let event_b = event_id!("$event_b");

View File

@@ -35,7 +35,7 @@ use crate::timeline::{
#[async_test]
async fn test_redact_state_event() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let f = &timeline.factory;
@@ -61,7 +61,7 @@ async fn test_redact_state_event() {
#[async_test]
async fn test_redact_replied_to_event() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let f = &timeline.factory;
@@ -100,7 +100,7 @@ async fn test_redact_replied_to_event() {
#[async_test]
async fn test_redaction_before_event() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let f = &timeline.factory;
@@ -136,7 +136,7 @@ async fn test_redaction_before_event() {
#[async_test]
async fn test_reaction_redaction() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let f = &timeline.factory;
@@ -162,7 +162,7 @@ async fn test_reaction_redaction() {
#[async_test]
async fn test_reaction_redaction_timeline_filter() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let f = &timeline.factory;
@@ -207,7 +207,7 @@ async fn test_reaction_redaction_timeline_filter() {
#[async_test]
async fn test_local_and_remote_echo_of_redaction() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe_events().await;
let f = &timeline.factory;

View File

@@ -29,7 +29,7 @@ use crate::timeline::{
#[async_test]
async fn test_no_shield_in_unencrypted_room() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -42,7 +42,7 @@ async fn test_no_shield_in_unencrypted_room() {
#[async_test]
async fn test_sent_in_clear_shield() {
let timeline = TestTimelineBuilder::new().room_encrypted(true).build();
let timeline = TestTimelineBuilder::new().room_encrypted(true).build().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -64,7 +64,7 @@ async fn test_sent_in_clear_shield() {
/// sure the shield only appears once the remote echo is received.
async fn test_local_sent_in_clear_shield() {
// Given an encrypted timeline.
let timeline = TestTimelineBuilder::new().room_encrypted(true).build();
let timeline = TestTimelineBuilder::new().room_encrypted(true).build().await;
let mut stream = timeline.subscribe().await;
// When sending an unencrypted event.
@@ -143,7 +143,7 @@ async fn test_local_sent_in_clear_shield() {
/// Once a beacon location update arrives without encryption info (i.e. it was
/// sent in clear), the shield must switch to `SentInClear`.
async fn test_live_location_no_sent_in_clear_shield() {
let timeline = TestTimelineBuilder::new().room_encrypted(true).build();
let timeline = TestTimelineBuilder::new().room_encrypted(true).build().await;
let mut stream = timeline.subscribe_events().await;
let beacon_id = event_id!("$beacon_info:example.org");
@@ -230,7 +230,7 @@ async fn test_live_location_no_sent_in_clear_shield() {
/// sent in clear` red warning.
async fn test_utd_shield() {
// Given we are in an encrypted room
let timeline = TestTimelineBuilder::new().room_encrypted(true).build();
let timeline = TestTimelineBuilder::new().room_encrypted(true).build().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;

View File

@@ -29,7 +29,7 @@ use crate::timeline::{VirtualTimelineItem, traits::RoomDataProvider as _};
#[async_test]
async fn test_date_divider() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let f = &timeline.factory;
@@ -91,7 +91,7 @@ async fn test_date_divider() {
#[async_test]
async fn test_update_read_marker() {
let timeline = TestTimeline::new();
let timeline = TestTimeline::new().await;
let mut stream = timeline.subscribe().await;
let own_user = timeline.controller.room_data_provider.own_user_id().to_owned();

View File

@@ -26,7 +26,7 @@ use matrix_sdk::{
};
use matrix_sdk_test::{ALICE, BOB, JoinedRoomBuilder, async_test, event_factory::EventFactory};
use matrix_sdk_ui::timeline::{
RoomExt as _, TimelineBuilder, TimelineDetails, TimelineEventFocusThreadMode,
EventSendState, RoomExt as _, TimelineBuilder, TimelineDetails, TimelineEventFocusThreadMode,
TimelineEventItemId, TimelineFocus, VirtualTimelineItem,
};
use ruma::{
@@ -304,6 +304,8 @@ async fn test_extract_bundled_thread_summary() {
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]);
assert!(value.is_date_divider());
assert_pending!(stream);
}
#[async_test]
@@ -513,7 +515,7 @@ async fn test_thread_msg_edit_reflects_in_summary() {
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
// Thread root+new implicit read receipt + new thread summary + day divider.
// Thread root + new implicit read receipt + new thread summary + day divider.
// TODO: could we optimize this, to have only a single timeline update?
assert_eq!(timeline_updates.len(), 4);
@@ -536,8 +538,12 @@ async fn test_thread_msg_edit_reflects_in_summary() {
// Now, with Bob's read receipt.
assert_eq!(event_item.read_receipts().len(), 2);
// The day divider is added.
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
assert!(value.is_date_divider());
// Eventually the summary comes in.
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[2]);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[3]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
// Now there is a summary!
@@ -549,10 +555,6 @@ async fn test_thread_msg_edit_reflects_in_summary() {
);
assert_eq!(latest_event.content.as_message().unwrap().body(), "threaded reply");
assert_eq!(summary.num_replies, 1);
// And finally, the day divider.
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[3]);
assert!(value.is_date_divider());
}
// When I receive an edit to that event, via sync,
@@ -678,8 +680,12 @@ async fn test_thread_poll_edit_reflects_in_summary() {
// Now, with Bob's read receipt.
assert_eq!(event_item.read_receipts().len(), 2);
// The day divider is added.
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
assert!(value.is_date_divider());
// Eventually the summary comes in.
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[2]);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[3]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
// Now there is a summary!
@@ -696,10 +702,6 @@ async fn test_thread_poll_edit_reflects_in_summary() {
assert_eq!(summary.num_replies, 1);
// And finally, the day divider.
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[3]);
assert!(value.is_date_divider());
// That's all, folks!
assert_pending!(stream);
}
@@ -780,13 +782,13 @@ async fn test_thread_filtering_for_sync() {
assert_eq!(event_item.content().as_message().unwrap().body(), "Thread root");
assert_matches!(event_item.content().thread_summary(), None);
// The item gets a thread summary.
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[1]);
assert_matches!(value.as_event().unwrap().content().thread_summary(), Some(_));
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]);
assert!(value.is_date_divider());
// The item gets a thread summary.
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[2]);
assert_matches!(value.as_event().unwrap().content().thread_summary(), Some(_));
assert_pending!(filtered_timeline_stream);
}
@@ -814,23 +816,21 @@ async fn test_thread_filtering_for_sync() {
"Within thread"
);
// The thread summary gets updated:
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[3]);
assert!(value.is_date_divider());
// The thread event is a reply (because of the reply fallback), and since its
// replied-to timeline item has been updated, it also gets updated.
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[3]);
assert_let!(VectorDiff::Set { index: 2, value } = &timeline_updates[4]);
assert_eq!(
value.as_event().unwrap().content().as_message().unwrap().body(),
"Within thread"
);
// Then the thread summary is updated on the thread root.
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[4]);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[5]);
assert_matches!(value.as_event().unwrap().content().thread_summary(), Some(_));
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[5]);
assert!(value.is_date_divider());
assert_pending!(timeline_stream);
}
@@ -1021,6 +1021,7 @@ async fn test_thread_timeline_gets_local_echoes() {
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert!(event_item.is_local_echo());
assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet { .. }));
assert!(event_item.event_id().is_none());
// The thread information is properly filled.
@@ -1030,19 +1031,17 @@ async fn test_thread_timeline_gets_local_echoes() {
// Then the local echo morphs into a sent local echo.
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 3);
assert_eq!(timeline_updates.len(), 1);
// The local event is updated.
assert_let!(VectorDiff::Set { index: 2, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id(), Some(sent_event_id));
assert!(event_item.is_local_echo());
assert_let!(Some(EventSendState::Sent { event_id }) = event_item.send_state());
assert_eq!(event_id, sent_event_id);
assert!(event_item.content().reactions().unwrap().is_empty());
// The local event is inserted in the Event Cache as a remote event.
assert_matches!(&timeline_updates[1], VectorDiff::Remove { index: 2 });
assert_let!(VectorDiff::PushBack { value: remote_event } = &timeline_updates[2]);
assert_eq!(remote_event.as_event().unwrap().event_id(), Some(sent_event_id));
// Then nothing else.
assert_pending!(stream);
@@ -1845,7 +1844,8 @@ async fn test_permalink_doesnt_listen_to_thread_sync() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;
@@ -1905,14 +1905,9 @@ async fn test_permalink_doesnt_listen_to_thread_sync() {
.mount()
.await;
let (room_event_cache, _drop_guards) = room.event_cache().await.unwrap();
let outcome = room_event_cache
.thread_pagination(thread_root.to_owned())
.await
.unwrap()
.run_backwards_once(42)
.await
.unwrap();
let (thread_event_cache, _drop_guards) =
event_cache.thread(room_id, thread_root).await.unwrap();
let outcome = thread_event_cache.pagination().run_backwards_once(42).await.unwrap();
assert!(outcome.reached_start.not());
sleep(Duration::from_millis(100)).await;
@@ -1989,6 +1984,8 @@ async fn test_redacted_replied_to_is_updated() {
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
assert!(value.is_date_divider());
assert_pending!(stream);
// When the first reply is redacted,
server
.sync_room(
@@ -2000,12 +1997,17 @@ async fn test_redacted_replied_to_is_updated() {
// The timeline sees the redaction as a removal,
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_eq!(timeline_updates.len(), 3);
assert_let!(VectorDiff::Remove { index: 1 } = &timeline_updates[0]);
// The first reply is being redacted by the `m.room.redaction` event the thread
// timeline receives.
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let ev1 = value.as_event().unwrap();
assert_eq!(ev1.event_id(), Some(first_reply));
// And then the replied-to update happens independently.
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[1]);
// The second reply is being updated because the event it replies to has been
// updated.
assert_let!(VectorDiff::Set { index: 2, value } = &timeline_updates[1]);
let ev2 = value.as_event().unwrap();
assert_eq!(ev2.event_id(), Some(second_reply));
let msglike = ev2.content().as_msglike().unwrap();
@@ -2013,6 +2015,11 @@ async fn test_redacted_replied_to_is_updated() {
assert_eq!(in_reply_to.event_id, first_reply);
assert_let!(TimelineDetails::Ready(replied_to_event) = &in_reply_to.event);
assert!(replied_to_event.content.is_redacted());
// Finally, the first reply is being removed.
assert_let!(VectorDiff::Remove { index: 1 } = &timeline_updates[2]);
assert_pending!(stream);
}
#[async_test]

View File

@@ -382,6 +382,7 @@ pub struct TimelineVectorDiffs {
}
/// An enum representing where an event has been found.
#[derive(Debug)]
pub(super) enum EventLocation {
/// Event lives in memory (and likely in the store!).
Memory(Position),

View File

@@ -1197,7 +1197,7 @@ mod timed_tests {
}
// After clearing,…
room_event_cache.clear().await.unwrap();
event_cache.clear_all_rooms().await.unwrap();
//… we get an update that the content has been cleared.
assert_let_timeout!(
@@ -1214,15 +1214,14 @@ mod timed_tests {
assert_eq!(received_room_id, room_id);
assert!(generic_stream.is_empty());
// Events individually are not forgotten by the event cache, after clearing a
// room.
assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
// Events are forgotten by the event cache, after clearing a room.
assert!(room_event_cache.find_event(event_id1).await.unwrap().is_none());
// But their presence in a linked chunk is forgotten.
// And their presence in a linked chunk is forgotten.
let items = room_event_cache.events().await.unwrap();
assert!(items.is_empty());
// The event cache store too.
// The event cache store is fully empty.
let linked_chunk = from_all_chunks::<3, _, _>(
event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
)

View File

@@ -42,7 +42,6 @@ pub(super) use self::{
use super::{
super::Result,
EventsOrigin, TimelineVectorDiffs,
lock::Reload as _,
room::{RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate},
};
use crate::room::WeakRoom;
@@ -220,7 +219,7 @@ impl ThreadEventCache {
/// It starts by looking into loaded events in `EventLinkedChunk` before
/// looking inside the storage.
#[cfg(test)]
pub async fn find_event(
async fn find_event(
&self,
event_id: &EventId,
) -> Result<Option<(super::EventLocation, Event)>> {
@@ -286,9 +285,7 @@ mod timed_tests {
room_id, user_id,
};
use super::super::{
super::RoomEventCacheGenericUpdate, TimelineVectorDiffs, room::RoomEventCacheUpdate,
};
use super::super::{super::RoomEventCacheGenericUpdate, TimelineVectorDiffs};
use crate::test_utils::client::MockClientBuilder;
#[async_test]
@@ -317,15 +314,11 @@ mod timed_tests {
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (room_events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(room_id, thread_root).await.unwrap();
let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
assert!(room_events.is_empty());
assert!(thread_events.is_empty());
// Propagate an update for a message and a prev-batch token.
@@ -340,38 +333,17 @@ mod timed_tests {
],
};
room_event_cache
thread_event_cache
.handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
.await
.unwrap();
// Checking the update are corrects.
assert_matches!(
generic_stream.recv().await,
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
assert_eq!(expected_room_id, room_id);
}
);
assert!(generic_stream.is_empty());
assert_matches!(
room_stream.recv().await,
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) => {
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(&diffs[1], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(thread_event_id_0));
});
}
);
assert!(room_stream.is_empty());
assert_matches!(
thread_stream.recv().await,
Ok(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Append { values: events } => {
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(&diffs[1], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(thread_event_id_0));
});
@@ -435,10 +407,9 @@ mod timed_tests {
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(room_id, thread_root).await.unwrap();
// Propagate an update for a message with bundled relations.
let timeline = Timeline {
@@ -453,23 +424,14 @@ mod timed_tests {
],
};
room_event_cache
thread_event_cache
.handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
.await
.unwrap();
// Just checking the generic update is correct.
assert_matches!(
generic_stream.recv().await,
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
assert_eq!(expected_room_id, room_id);
}
);
assert!(generic_stream.is_empty());
// The in-memory linked chunk keeps the bundled relation.
{
let events = room_event_cache.events().await.unwrap();
let (events, _) = thread_event_cache.subscribe().await.unwrap();
assert_eq!(events.len(), 1);
@@ -484,7 +446,10 @@ mod timed_tests {
// The one in storage does not.
let linked_chunk = from_all_chunks::<3, _, _>(
event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
event_cache_store
.load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
.await
.unwrap(),
)
.unwrap()
.unwrap();
@@ -588,30 +553,17 @@ mod timed_tests {
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(room_id, thread_root).await.unwrap();
let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
// The thread knows about all cached events.
{
assert!(
room_event_cache
.find_event_in_thread(thread_root.to_owned(), thread_event_id_0)
.await
.unwrap()
.is_some()
);
assert!(
room_event_cache
.find_event_in_thread(thread_root.to_owned(), thread_event_id_1)
.await
.unwrap()
.is_some()
);
assert!(thread_event_cache.find_event(thread_event_id_0).await.unwrap().is_some());
assert!(thread_event_cache.find_event(thread_event_id_1).await.unwrap().is_some());
}
// But only part of events are loaded from the store.
@@ -626,13 +578,7 @@ mod timed_tests {
// Let's load more chunks to load all events.
{
room_event_cache
.thread_pagination(thread_root.to_owned())
.await
.unwrap()
.run_backwards_once(20)
.await
.unwrap();
thread_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert_matches!(
thread_stream.recv().await,
@@ -656,7 +602,7 @@ mod timed_tests {
}
// After clearing,…
room_event_cache.clear().await.unwrap();
event_cache.clear_all_rooms().await.unwrap();
//… we get an update that the content has been cleared.
assert_matches!(
@@ -676,29 +622,16 @@ mod timed_tests {
);
assert!(generic_stream.is_empty());
// Events individually are not forgotten by the event cache, after clearing a
// room and the threads.
assert!(
room_event_cache
.find_event_in_thread(thread_root.to_owned(), thread_event_id_0)
.await
.unwrap()
.is_some()
);
assert!(
room_event_cache
.find_event_in_thread(thread_root.to_owned(), thread_event_id_1)
.await
.unwrap()
.is_some()
);
// Events individually are forgotten by the event cache, after clearing the
// threads.
assert!(thread_event_cache.find_event(thread_event_id_0).await.unwrap().is_none());
assert!(thread_event_cache.find_event(thread_event_id_1).await.unwrap().is_none());
// But their presence in a linked chunk is forgotten.
let (thread_events, _) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
// And their presence in a linked chunk is forgotten.
let (thread_events, _) = thread_event_cache.subscribe().await.unwrap();
assert!(thread_events.is_empty());
// The event cache store too.
// The event cache store is totally empty.
let linked_chunk = from_all_chunks::<3, _, _>(
event_cache_store
.load_all_chunks(LinkedChunkId::Thread(room_id, thread_root))
@@ -796,13 +729,12 @@ mod timed_tests {
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
// Let's check whether the generic updates are received for the initialisation.
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(room_id, thread_root).await.unwrap();
let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
// The room **and** the thread have been loaded. Two generic updates must have
// been triggered.
@@ -824,29 +756,11 @@ mod timed_tests {
// The thread knows all events in the storage though, even if they aren't
// loaded.
assert!(
room_event_cache
.find_event_in_thread(thread_root.to_owned(), thread_event_id_0)
.await
.unwrap()
.is_some()
);
assert!(
room_event_cache
.find_event_in_thread(thread_root.to_owned(), thread_event_id_1)
.await
.unwrap()
.is_some()
);
assert!(thread_event_cache.find_event(thread_event_id_0).await.unwrap().is_some());
assert!(thread_event_cache.find_event(thread_event_id_1).await.unwrap().is_some());
// Let's paginate to load more events.
room_event_cache
.thread_pagination(thread_root.to_owned())
.await
.unwrap()
.run_backwards_once(20)
.await
.unwrap();
thread_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert_matches!(
thread_stream.recv().await,
@@ -871,7 +785,7 @@ mod timed_tests {
// A new update with one of these events leads to deduplication.
let timeline = Timeline { limited: false, prev_batch: None, events: vec![thread_event_1] };
room_event_cache
thread_event_cache
.handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
.await
.unwrap();
@@ -884,8 +798,7 @@ mod timed_tests {
// when subscribing, to check that the events correspond to their new
// positions. The duplicated item is removed (so it's not the first
// element anymore), and it's added to the back of the list.
let (thread_events, _) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
let (thread_events, _) = thread_event_cache.subscribe().await.unwrap();
assert_eq!(thread_events.len(), 2);
assert_eq!(thread_events[0].event_id().as_deref(), Some(thread_event_id_0));
assert_eq!(thread_events[1].event_id().as_deref(), Some(thread_event_id_1));
@@ -947,11 +860,10 @@ mod timed_tests {
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_events, _) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(room_id, thread_root).await.unwrap();
let (thread_events, _) = thread_event_cache.subscribe().await.unwrap();
// Because the persisted content was invalid, the thread store is reset:
// there are no events in the cache.
@@ -1052,7 +964,7 @@ mod timed_tests {
.unwrap();
// Subscribe the event caches, and create the room.
let (room_event_cache_p0, room_event_cache_p1) = {
let (thread_event_cache_p0, thread_event_cache_p1) = {
let event_cache_p0 = client_p0.event_cache();
event_cache_p0.subscribe().unwrap();
@@ -1062,23 +974,23 @@ mod timed_tests {
client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
let (room_event_cache_p0, _drop_handles) =
client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
let (room_event_cache_p1, _drop_handles) =
client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
let (thread_event_cache_p0, _drop_handles) =
event_cache_p0.thread(room_id, thread_root).await.unwrap();
let (thread_event_cache_p1, _drop_handles) =
event_cache_p1.thread(room_id, thread_root).await.unwrap();
(room_event_cache_p0, room_event_cache_p1)
(thread_event_cache_p0, thread_event_cache_p1)
};
// Okay. We are ready for the test!
//
// First off, let's check `room_event_cache_p0` has access to the first event
// First off, let's check `thread_event_cache_p0` has access to the first event
// loaded in-memory, then do a pagination, and see more events.
let mut updates_stream_p0 = {
let room_event_cache = &room_event_cache_p0;
let thread_event_cache = &thread_event_cache_p0;
let (initial_updates, mut updates_stream) =
room_event_cache_p0.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
thread_event_cache_p0.subscribe().await.unwrap();
// Initial updates contain `thread_event_id_1` only.
assert_eq!(initial_updates.len(), 1);
@@ -1086,13 +998,7 @@ mod timed_tests {
assert!(updates_stream.is_empty());
// Load one more event with a backpagination.
room_event_cache
.thread_pagination(thread_root.to_owned())
.await
.unwrap()
.run_backwards_once(1)
.await
.unwrap();
thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
// A new update for `ev_id_0` must be present.
assert_matches!(
@@ -1111,11 +1017,11 @@ mod timed_tests {
updates_stream
};
// Second, let's check `room_event_cache_p1` has the same accesses.
// Second, let's check `thread_event_cache_p1` has the same accesses.
let mut updates_stream_p1 = {
let room_event_cache = &room_event_cache_p1;
let thread_event_cache = &thread_event_cache_p1;
let (initial_updates, mut updates_stream) =
room_event_cache_p1.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
thread_event_cache_p1.subscribe().await.unwrap();
// Initial updates contain `thread_event_id_1` only.
assert_eq!(initial_updates.len(), 1);
@@ -1123,13 +1029,7 @@ mod timed_tests {
assert!(updates_stream.is_empty());
// Load one more event with a backpagination.
room_event_cache
.thread_pagination(thread_root.to_owned())
.await
.unwrap()
.run_backwards_once(1)
.await
.unwrap();
thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
// A new update for `thread_event_id_0` must be present.
assert_matches!(
@@ -1150,18 +1050,17 @@ mod timed_tests {
// Do this a couple times, for the fun.
for _ in 0..3 {
// Third, because `room_event_cache_p1` has locked the store, the lock
// is dirty for `room_event_cache_p0`, so it will shrink to its last
// Third, because `thread_event_cache_p1` has locked the store, the lock
// is dirty for `thread_event_cache_p0`, so it will shrink to its last
// chunk for the thread!
{
let room_event_cache = &room_event_cache_p0;
let thread_event_cache = &thread_event_cache_p0;
let updates_stream = &mut updates_stream_p0;
// `thread_event_id_1` must be loaded in memory, just like before.
// However, `thread_event_id_0` must NOT be loaded in memory. It WAS loaded, but
// the state has been reloaded to its last chunk.
let (initial_updates, _) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
let (initial_updates, _) = thread_event_cache.subscribe().await.unwrap();
assert_eq!(initial_updates.len(), 1);
assert_eq!(initial_updates[0].event_id().as_deref(), Some(thread_event_id_1));
@@ -1183,13 +1082,7 @@ mod timed_tests {
);
// Load one more event with a backpagination.
room_event_cache
.thread_pagination(thread_root.to_owned())
.await
.unwrap()
.run_backwards_once(1)
.await
.unwrap();
thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
// `thread_event_id_0` must now be loaded in memory.
// The pagination can be observed via the updates.
@@ -1207,18 +1100,17 @@ mod timed_tests {
);
}
// Fourth, because `room_event_cache_p0` has locked the store again, the lock
// is dirty for `room_event_cache_p1` too!, so it will shrink to its last
// Fourth, because `thread_event_cache_p0` has locked the store again, the lock
// is dirty for `thread_event_cache_p1` too!, so it will shrink to its last
// chunk for the thread!
{
let room_event_cache = &room_event_cache_p1;
let thread_event_cache = &thread_event_cache_p1;
let updates_stream = &mut updates_stream_p1;
// `thread_event_id_1` must be loaded in memory, just like before.
// However, `thread_event_id_0` must NOT be loaded in memory. It WAS loaded, but
// the state has shrunk to its last chunk.
let (initial_updates, _) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
let (initial_updates, _) = thread_event_cache.subscribe().await.unwrap();
assert_eq!(initial_updates.len(), 1);
assert_eq!(initial_updates[0].event_id().as_deref(), Some(thread_event_id_1));
@@ -1240,13 +1132,7 @@ mod timed_tests {
);
// Load one more event with a backpagination.
room_event_cache
.thread_pagination(thread_root.to_owned())
.await
.unwrap()
.run_backwards_once(1)
.await
.unwrap();
thread_event_cache.pagination().run_backwards_once(1).await.unwrap();
// `thread_event_id_0` must now be loaded in memory.
// The pagination can be observed via the updates.

View File

@@ -12,7 +12,6 @@ use matrix_sdk::{
assert_event_matches_msg,
mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
},
timeout::timeout,
};
use matrix_sdk_test::{ALICE, JoinedRoomBuilder, async_test, event_factory::EventFactory};
use ruma::{
@@ -75,7 +74,7 @@ async fn test_thread_contains_its_root_event() {
// Receive an in-thread event.
let f = EventFactory::new().room(room_id).sender(*ALICE);
let room = server
let _room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
@@ -86,10 +85,9 @@ async fn test_thread_contains_its_root_event() {
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(room_id, thread_root_id).await.unwrap();
let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
// Sanity check: the event is added to the thread via the sync.
let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await;
@@ -115,13 +113,7 @@ async fn test_thread_contains_its_root_event() {
.mount()
.await;
let outcome = room_event_cache
.thread_pagination(thread_root_id.to_owned())
.await
.unwrap()
.run_backwards_once(42)
.await
.unwrap();
let outcome = thread_event_cache.pagination().run_backwards_once(42).await.unwrap();
assert!(outcome.reached_start);
assert_let_timeout!(Ok(TimelineVectorDiffs { diffs, .. }) = thread_stream.recv());
@@ -136,7 +128,8 @@ async fn test_ignored_user_empties_threads() {
let client = client_with_threading_support(&server).await;
// Immediately subscribe the event cache to sync updates.
client.event_cache().subscribe().unwrap();
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
@@ -150,7 +143,7 @@ async fn test_ignored_user_empties_threads() {
let second_reply_event_id = event_id!("$second_reply");
// Given a room with a thread, that has two replies.
let room = server
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_bulk(vec![
@@ -169,9 +162,9 @@ async fn test_ignored_user_empties_threads() {
.await;
// And we subscribe to the thread,
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(room_id, thread_root).await.unwrap();
let (events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
// Then, at first, the thread contains the two initial events.
let events = wait_for_initial_events(events, &mut thread_stream).await;
@@ -228,7 +221,8 @@ async fn test_deduplication() {
let client = client_with_threading_support(&server).await;
// Immediately subscribe the event cache to sync updates.
client.event_cache().subscribe().unwrap();
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
@@ -249,7 +243,7 @@ async fn test_deduplication() {
.into_raw_timeline();
// Given a room with a thread, that has two replies.
let room = server
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
@@ -258,9 +252,9 @@ async fn test_deduplication() {
.await;
// And we subscribe to the thread,
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(room_id, thread_root).await.unwrap();
let (events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
// Then, at first, the thread contains the two initial events.
let events = wait_for_initial_events(events, &mut thread_stream).await;
@@ -295,13 +289,7 @@ async fn test_deduplication() {
.mount()
.await;
room_event_cache
.thread_pagination(thread_root.to_owned())
.await
.unwrap()
.run_backwards_once(42)
.await
.unwrap();
thread_event_cache.pagination().run_backwards_once(42).await.unwrap();
// The events were already known, so the stream is still empty.
assert!(thread_stream.is_empty());
@@ -327,6 +315,9 @@ struct ThreadSubscriptionTestSetup {
/// The setup includes 3 events (1 non-mention, 1 mention, and another
/// non-mention) in the same thread, for easy testing of automated
/// subscriptions.
///
/// Note that no events are synced yet. The thread root event is not returned,
/// only its ID.
async fn thread_subscription_test_setup() -> ThreadSubscriptionTestSetup {
let server = MatrixMockServer::new().await;
@@ -346,7 +337,8 @@ async fn thread_subscription_test_setup() -> ThreadSubscriptionTestSetup {
server.mock_versions().with_thread_subscriptions().ok().mount().await;
// Immediately subscribe the event cache to sync updates.
client.event_cache().subscribe().unwrap();
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let room = server.sync_joined_room(&client, room_id).await;
@@ -492,15 +484,13 @@ async fn test_auto_subscribe_on_thread_paginate() {
let event_cache = s.client.event_cache();
event_cache.subscribe().unwrap();
let mut thread_subscriber_updates =
s.client.event_cache().subscribe_thread_subscriber_updates();
let mut thread_subscriber_updates = event_cache.subscribe_thread_subscriber_updates();
let thread_root_id = event_id!("$thread_root");
let thread_root_id = &s.thread_root;
let thread_resp_id = event_id!("$thread_resp");
// Receive an in-thread event.
let room = s
.server
s.server
.sync_room(
&s.client,
JoinedRoomBuilder::new(&s.room_id).add_timeline_event(
@@ -512,10 +502,10 @@ async fn test_auto_subscribe_on_thread_paginate() {
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(&s.room_id, thread_root_id).await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await.unwrap();
let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
// Sanity check: the sync event is added to the thread.
let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await;
@@ -554,13 +544,7 @@ async fn test_auto_subscribe_on_thread_paginate() {
.mount()
.await;
let outcome = room_event_cache
.thread_pagination(thread_root_id.to_owned())
.await
.unwrap()
.run_backwards_once(42)
.await
.unwrap();
let outcome = thread_event_cache.pagination().run_backwards_once(42).await.unwrap();
assert!(outcome.reached_start);
// Let the event cache process the update.
@@ -579,15 +563,13 @@ async fn test_auto_subscribe_on_thread_paginate_root_event() {
let event_cache = s.client.event_cache();
event_cache.subscribe().unwrap();
let mut thread_subscriber_updates =
s.client.event_cache().subscribe_thread_subscriber_updates();
let mut thread_subscriber_updates = event_cache.subscribe_thread_subscriber_updates();
let thread_root_id = event_id!("$thread_root");
let thread_root_id = &s.thread_root;
let thread_resp_id = event_id!("$thread_resp");
// Receive an in-thread event.
let room = s
.server
s.server
.sync_room(
&s.client,
JoinedRoomBuilder::new(&s.room_id).add_timeline_event(
@@ -599,10 +581,10 @@ async fn test_auto_subscribe_on_thread_paginate_root_event() {
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(&s.room_id, thread_root_id).await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await.unwrap();
let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
// Sanity check: the sync event is added to the thread.
let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await;
@@ -645,13 +627,7 @@ async fn test_auto_subscribe_on_thread_paginate_root_event() {
.mount()
.await;
let outcome = room_event_cache
.thread_pagination(thread_root_id.to_owned())
.await
.unwrap()
.run_backwards_once(42)
.await
.unwrap();
let outcome = thread_event_cache.pagination().run_backwards_once(42).await.unwrap();
assert!(outcome.reached_start);
// Let the event cache process the update.
@@ -675,12 +651,12 @@ async fn test_redact_touches_threads() {
let thread_resp1 = s.events[0].get_field::<OwnedEventId>("event_id").unwrap().unwrap();
let thread_resp2 = s.events[1].get_field::<OwnedEventId>("event_id").unwrap().unwrap();
let room = s.server.sync_joined_room(&s.client, &s.room_id).await;
s.server.sync_joined_room(&s.client, &s.room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(&s.room_id, &thread_root_id).await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await.unwrap();
let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
// Receive a thread root, and a threaded reply.
s.server
@@ -701,6 +677,7 @@ async fn test_redact_touches_threads() {
assert_eq!(thread_events.remove(0).event_id().as_ref(), Some(&thread_resp1));
assert_eq!(thread_events.remove(0).event_id().as_ref(), Some(&thread_resp2));
let (room_event_cache, _drop_handles) = event_cache.room(&s.room_id).await.unwrap();
let (room_events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
assert_eq!(room_events.len(), 3);
@@ -852,12 +829,13 @@ async fn test_edits_touches_threads() {
let thread_root_id = s.thread_root;
let room = s.server.sync_joined_room(&s.client, &s.room_id).await;
s.server.sync_joined_room(&s.client, &s.room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (room_event_cache, _drop_handles) = event_cache.room(&s.room_id).await.unwrap();
let (thread_event_cache, _drop_handles) =
event_cache.thread(&s.room_id, &thread_root_id).await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await.unwrap();
let (thread_events, mut thread_stream) = thread_event_cache.subscribe().await.unwrap();
// Receive a thread root, and a threaded reply.
s.server
@@ -873,13 +851,15 @@ async fn test_edits_touches_threads() {
wait_for_initial_events(thread_events, &mut thread_stream).await;
let (room_events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
// A valid edit for the first reply comes through sync.
let valid_edit_event_id = event_id!("$valid_edit");
assert!(room_stream.is_empty());
// The last event in the thread is edited.
let first_edit = event_id!("$foo");
s.server
.sync_room(
&s.client,
JoinedRoomBuilder::new(&s.room_id).add_timeline_event(
f.text_msg("Nobody speaks English anymore.").event_id(valid_edit_event_id).edit(
f.text_msg("Nobody speaks English anymore.").event_id(first_edit).edit(
&room_events[2].event_id().unwrap(),
RoomMessageEventContentWithoutRelation::text_plain("edited text"),
),
@@ -887,58 +867,136 @@ async fn test_edits_touches_threads() {
)
.await;
// Edits are not emitted over the thread subscriber, the timeline uses the
// normal room stream to handle those.
//
// So we're going to look only at the room stream and see if the thread summary
// gets correctly updated.
// Check the room updates.
{
// The room stream receives an update.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
room_stream.recv()
);
assert_eq!(diffs.len(), 2);
// The edit gets appended to the stream.
assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]);
assert_eq!(new_events.len(), 1);
assert_eq!(new_events[0].event_id().as_deref(), Some(valid_edit_event_id));
// The thread summary is updated.
// First update.
{
assert_let!(VectorDiff::Set { index: 0, value: new_root } = &diffs[1]);
assert_eq!(new_root.event_id().as_ref(), Some(&thread_root_id));
let summary = new_root.thread_summary.summary().unwrap();
assert_eq!(summary.latest_reply.as_deref(), Some(valid_edit_event_id));
assert_eq!(summary.num_replies, 2);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
room_stream.recv()
);
assert_eq!(diffs.len(), 1);
// Oh, an edit event.
assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]);
assert_eq!(new_events.len(), 1);
assert_eq!(new_events[0].event_id().as_deref(), Some(first_edit));
}
// Second update.
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
room_stream.recv()
);
assert_eq!(diffs.len(), 1);
// The thread summary is updated.
{
assert_let!(VectorDiff::Set { index: 0, value: new_root } = &diffs[0]);
assert_eq!(new_root.event_id().as_ref(), Some(&thread_root_id));
let summary = new_root.thread_summary.summary().unwrap();
assert_eq!(summary.latest_reply.as_deref(), Some(first_edit));
assert_eq!(summary.num_replies, 2);
}
}
// That's it.
assert!(room_stream.is_empty());
}
// An invalid edit for the second reply comes through sync.
let invalid_edit_id = event_id!("$invalid_edit");
// Check the thread updates.
{
// First update.
{
assert_let_timeout!(Ok(TimelineVectorDiffs { diffs, .. }) = thread_stream.recv());
assert_eq!(diffs.len(), 1);
// Oh, an edit event.
assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]);
assert_eq!(new_events.len(), 1);
assert_eq!(new_events[0].event_id().as_deref(), Some(first_edit));
}
// That's it.
assert!(thread_stream.is_empty());
}
// The event before the last event is edited.
let second_edit = event_id!("$bar");
s.server
.sync_room(
&s.client,
JoinedRoomBuilder::new(&s.room_id).add_timeline_event(
f.text_msg("Nobody speaks english anymore.").event_id(invalid_edit_id).edit(
&room_events[2].event_id().unwrap(),
f.text_msg("Nobody speaks english anymore.").event_id(second_edit).edit(
&room_events[1].event_id().unwrap(),
RoomMessageEventContentWithoutRelation::text_plain("edited text"),
),
),
)
.await;
// It's a bit hard to know when the update should have been ready. This makes it
// hard to prove that no update happened.
let result = timeout(room_stream.recv(), Duration::from_secs(1)).await;
// Check the room updates.
{
// First update.
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert!(result.is_err(), "The room stream should have timed out as the edit was invnalid");
// Oh, an edit event.
assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]);
assert_eq!(new_events.len(), 1);
assert_eq!(new_events[0].event_id().as_deref(), Some(second_edit));
}
// Second update.
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
room_stream.recv()
);
assert_eq!(diffs.len(), 1);
// The thread summary is updated but… to the same value!
// It is always updated as soon as an update happens in the cache.
{
assert_let!(VectorDiff::Set { index: 0, value: new_root } = &diffs[0]);
assert_eq!(new_root.event_id().as_ref(), Some(&thread_root_id));
let summary = new_root.thread_summary.summary().unwrap();
// But the `latest_reply` is still `first_edit`, not `second_edit`!
assert_eq!(summary.latest_reply.as_deref(), Some(first_edit));
assert_eq!(summary.num_replies, 2);
}
}
// That's it.
assert!(room_stream.is_empty());
}
// Check the thread updates.
{
// First update.
{
assert_let_timeout!(Ok(TimelineVectorDiffs { diffs, .. }) = thread_stream.recv());
assert_eq!(diffs.len(), 1);
// Oh, an edit event.
assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]);
assert_eq!(new_events.len(), 1);
assert_eq!(new_events[0].event_id().as_deref(), Some(second_edit));
}
// That's it.
assert!(thread_stream.is_empty());
}
let room_events = room_event_cache.events().await.unwrap();
let first = room_events.first().unwrap();
let thread_summary = first.thread_summary.summary().unwrap();
// The latest reply should still be our valid event, not our invalid one.
assert_eq!(thread_summary.latest_reply.as_deref(), Some(valid_edit_event_id));
// The latest reply should still be our first edit, not the second one.
assert_eq!(thread_summary.latest_reply.as_deref(), Some(first_edit));
}

View File

@@ -644,8 +644,7 @@ async fn test_room_keys_received_on_notification_client_trigger_redecryption() {
for _ in 0..10 {
{
// Clear any previously received previous-batch token.
let (room_event_cache, _drop_handles) = bob_room.event_cache().await.unwrap();
room_event_cache.clear().await.unwrap();
bob.event_cache().clear_all_rooms().await.unwrap();
}
timeline