feat(event cache): enable storage by default \o/

This commit is contained in:
Benjamin Bouvier
2025-01-13 14:28:14 +01:00
parent 1d901ec12a
commit 13a2a8757e
11 changed files with 130 additions and 520 deletions

View File

@@ -288,10 +288,6 @@ pub struct ClientBuilder {
room_key_recipient_strategy: CollectStrategy,
decryption_trust_requirement: TrustRequirement,
request_config: Option<RequestConfig>,
/// Whether to enable use of the event cache store, for reloading events
/// when building timelines et al.
use_event_cache_persistent_storage: bool,
}
#[matrix_sdk_ffi_macros::export]
@@ -326,27 +322,9 @@ impl ClientBuilder {
room_key_recipient_strategy: Default::default(),
decryption_trust_requirement: TrustRequirement::Untrusted,
request_config: Default::default(),
use_event_cache_persistent_storage: false,
})
}
/// Whether to use the event cache persistent storage or not.
///
/// This is a temporary feature flag, for testing the event cache's
/// persistent storage. Follow new developments in https://github.com/matrix-org/matrix-rust-sdk/issues/3280.
///
/// This is disabled by default. When disabled, a one-time cleanup is
/// performed when creating the client, and it will clear all the events
/// previously stored in the event cache.
///
/// When enabled, it will attempt to store events in the event cache as
/// they're received, and reuse them when reconstructing timelines.
pub fn use_event_cache_persistent_storage(self: Arc<Self>, value: bool) -> Arc<Self> {
let mut builder = unwrap_or_clone_arc(self);
builder.use_event_cache_persistent_storage = value;
Arc::new(builder)
}
pub fn cross_process_store_locks_holder_name(
self: Arc<Self>,
holder_name: String,
@@ -739,19 +717,6 @@ impl ClientBuilder {
let sdk_client = inner_builder.build().await?;
if builder.use_event_cache_persistent_storage {
// Enable the persistent storage \o/
sdk_client.event_cache().enable_storage()?;
} else {
// Get rid of all the previous events, if any.
let store = sdk_client
.event_cache_store()
.lock()
.await
.map_err(EventCacheError::LockingStorage)?;
store.clear_all_rooms_chunks().await.map_err(EventCacheError::Storage)?;
}
Ok(Arc::new(
Client::new(sdk_client, builder.enable_oidc_refresh_lock, builder.session_delegate)
.await?,

View File

@@ -134,7 +134,6 @@ async fn test_an_utd_from_the_event_cache_as_an_initial_item_is_decrypted() {
// Set up the event cache.
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room = mock_server.sync_joined_room(&client, room_id).await;
let timeline = room.timeline().await.unwrap();
@@ -289,7 +288,6 @@ async fn test_an_utd_from_the_event_cache_as_a_paginated_item_is_decrypted() {
// Set up the event cache.
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room = mock_server.sync_joined_room(&client, room_id).await;
let timeline = room.timeline().await.unwrap();

View File

@@ -749,7 +749,6 @@ async fn test_timeline_receives_a_limited_number_of_events_when_subscribing() {
// Set up the event cache.
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room = client.get_room(room_id).unwrap();

View File

@@ -267,9 +267,6 @@ async fn test_wait_for_token() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client_with_server().await;
client.event_cache().subscribe().unwrap();
client.event_cache().enable_storage().unwrap();
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let f = EventFactory::new();
@@ -309,6 +306,7 @@ async fn test_wait_for_token() {
let paginate = async {
timeline.paginate_backwards(10).await.unwrap();
};
let observe_paginating = async {
assert_eq!(back_pagination_status.next().await, Some(RoomPaginationStatus::Paginating));
assert_eq!(
@@ -316,11 +314,13 @@ async fn test_wait_for_token() {
Some(RoomPaginationStatus::Idle { hit_timeline_start: false })
);
};
let sync = async {
// Make sure syncing starts a little bit later than pagination
sleep(Duration::from_millis(100)).await;
client.sync_once(sync_settings.clone()).await.unwrap();
};
timeout(Duration::from_secs(4), join3(paginate, observe_paginating, sync)).await.unwrap();
// Make sure pagination was called (with the right parameters)
@@ -405,9 +405,10 @@ async fn test_timeline_reset_while_paginating() {
let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline().await.unwrap());
let alice_event = f.text_msg("live event!").sender(&ALICE).room(room_id).into_raw();
sync_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("live event!").sender(&ALICE))
.add_timeline_event(alice_event.clone())
.set_timeline_prev_batch("pagination_1".to_owned())
.set_timeline_limited(),
);
@@ -425,7 +426,10 @@ async fn test_timeline_reset_while_paginating() {
);
mock_sync(&server, sync_builder.build_json_sync_response(), None).await;
// pagination with first token
// The pagination with the first token will be hit twice:
// - first, before the sync response comes, then the gap is stored in the cache.
// - second, after all other gaps have been resolved, we get back to resolving
// this one.
Mock::given(method("GET"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$"))
.and(header("authorization", "Bearer 1234"))
@@ -435,23 +439,25 @@ async fn test_timeline_reset_while_paginating() {
.set_body_json(json!({
"chunk": [],
"start": "pagination_1",
"end": "some_other_token",
"end": "pagination_3",
}))
// Make sure the concurrent sync request returns first
.set_delay(Duration::from_millis(200)),
)
.expect(1)
.expect(2)
.named("pagination_1")
.mount(&server)
.await;
// pagination with second token
// The pagination with the second token must return Alice's event, to be
// consistent with the room's timeline (Alice's event was observed before
// Bob's event).
Mock::given(method("GET"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$"))
.and(header("authorization", "Bearer 1234"))
.and(query_param("from", "pagination_2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"chunk": [],
"chunk": [alice_event],
"start": "pagination_2",
})))
.expect(1)
@@ -459,12 +465,36 @@ async fn test_timeline_reset_while_paginating() {
.mount(&server)
.await;
// pagination with third token
Mock::given(method("GET"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$"))
.and(header("authorization", "Bearer 1234"))
.and(query_param("from", "pagination_3"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"chunk": [],
"start": "pagination_3",
})))
.expect(1)
.named("pagination_3")
.mount(&server)
.await;
let (_, mut back_pagination_status) = timeline.live_back_pagination_status().await.unwrap();
let paginate = async { timeline.paginate_backwards(10).await.unwrap() };
let paginate = async {
let mut hit_start;
loop {
hit_start = timeline.paginate_backwards(10).await.unwrap();
if hit_start {
break;
}
}
hit_start
};
let observe_paginating = async {
let mut seen_paginating = false;
let mut seen_idle_no_start = false;
// Observe paginating updates: we want to make sure we see at least once
// Paginating, and that it settles with Idle.
@@ -476,12 +506,16 @@ async fn test_timeline_reset_while_paginating() {
if state == RoomPaginationStatus::Paginating {
seen_paginating = true;
}
if matches!(state, RoomPaginationStatus::Idle { hit_timeline_start: false }) {
seen_idle_no_start = true;
}
}
None => break,
}
}
assert!(seen_paginating);
assert!(seen_idle_no_start);
let (status, _) = timeline.live_back_pagination_status().await.unwrap();
@@ -502,10 +536,10 @@ async fn test_timeline_reset_while_paginating() {
assert!(hit_start);
// No events in back-pagination responses, start of timeline + date divider +
// event from latest sync is present
assert_eq!(timeline.items().await.len(), 3);
// all events from the previous syncs are present.
assert_eq!(timeline.items().await.len(), 4);
// Make sure both pagination mocks were called
// Make sure both pagination mocks were called.
server.verify().await;
}
@@ -1085,6 +1119,8 @@ async fn test_lazy_back_pagination() {
let hit_end_of_timeline = timeline.paginate_backwards(5).await.unwrap();
// There was no previous-batch token in the previous /messages response, so
// we've hit the start of the timeline.
assert!(hit_end_of_timeline);
// The start of the timeline is inserted as its own timeline update.
@@ -1097,12 +1133,12 @@ async fn test_lazy_back_pagination() {
//
// They are inserted after the date divider and start of timeline, hence the
// indices 2 and 3.
assert_timeline_stream! {
[timeline_stream]
insert[2] "$ev201";
insert[3] "$ev200";
};
// So cool.
assert_pending!(timeline_stream);

View File

@@ -242,7 +242,6 @@ async fn test_cached_events_are_kept_for_different_room_instances() {
// Subscribe to the event cache.
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let f = EventFactory::new().room(room_id).sender(*BOB);
let pinned_event = f

View File

@@ -168,23 +168,6 @@ impl EventCache {
}
}
/// Enable storing updates to storage, and reload events from storage.
///
/// Has an effect only the first time it's called. It's safe to call it
/// multiple times.
pub fn enable_storage(&self) -> Result<()> {
let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
let client = self.inner.client()?;
Ok(client.event_cache_store().clone())
})?;
Ok(())
}
/// Check whether the storage is enabled or not.
pub fn has_storage(&self) -> bool {
self.inner.has_storage()
}
/// Starts subscribing the [`EventCache`] to sync responses, if not done
/// before.
///
@@ -193,6 +176,13 @@ impl EventCache {
pub fn subscribe(&self) -> Result<()> {
let client = self.inner.client()?;
// Initialize storage.
let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
let client = self.inner.client()?;
Ok(client.event_cache_store().clone())
})?;
// Initialize the drop handles.
let _ = self.inner.drop_handles.get_or_init(|| {
// Spawn the task that will listen to all the room updates at once.
let listen_updates_task = spawn(Self::listen_task(
@@ -411,11 +401,6 @@ impl EventCacheInner {
self.client.get().ok_or(EventCacheError::ClientDropped)
}
/// Has persistent storage been enabled for the event cache?
fn has_storage(&self) -> bool {
self.store.get().is_some()
}
/// Clears all the room's data.
async fn clear_all_rooms(&self) -> Result<()> {
// Okay, here's where things get complicated.
@@ -499,9 +484,7 @@ impl EventCacheInner {
for (room_id, left_room_update) in updates.left {
let room = self.for_room(&room_id).await?;
if let Err(err) =
room.inner.handle_left_room_update(self.has_storage(), left_room_update).await
{
if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
// Non-fatal error, try to continue to the next room.
error!("handling left room update: {err}");
}
@@ -511,9 +494,7 @@ impl EventCacheInner {
for (room_id, joined_room_update) in updates.joined {
let room = self.for_room(&room_id).await?;
if let Err(err) =
room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
{
if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
// Non-fatal error, try to continue to the next room.
error!(%room_id, "handling joined room update: {err}");
}
@@ -716,10 +697,7 @@ mod tests {
room_event_cache
.inner
.handle_joined_room_update(
event_cache.inner.has_storage(),
JoinedRoomUpdate { account_data, ..Default::default() },
)
.handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
.await
.unwrap();
@@ -798,7 +776,6 @@ mod tests {
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let event_id = event_id!("$1");

View File

@@ -337,13 +337,8 @@ impl RoomEventCacheInner {
}
#[instrument(skip_all, fields(room_id = %self.room_id))]
pub(super) async fn handle_joined_room_update(
&self,
has_storage: bool,
updates: JoinedRoomUpdate,
) -> Result<()> {
pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
self.handle_timeline(
has_storage,
updates.timeline,
updates.ephemeral.clone(),
updates.ambiguity_changes,
@@ -355,106 +350,14 @@ impl RoomEventCacheInner {
Ok(())
}
#[instrument(skip_all, fields(room_id = %self.room_id))]
pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
Ok(())
}
async fn handle_timeline(
&self,
has_storage: bool,
timeline: Timeline,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
if !has_storage && timeline.limited {
// Ideally we'd try to reconcile existing events against those received in the
// timeline, but we're not there yet. In the meanwhile, clear the
// items from the room. TODO: implement Smart Matching™.
trace!("limited timeline, clearing all previous events and pushing new events");
self.replace_all_events_by(
timeline.events,
timeline.prev_batch,
ephemeral_events,
ambiguity_changes,
EventsOrigin::Sync,
)
.await?;
} else {
// Add all the events to the backend.
trace!("adding new events");
let mut state = self.state.write().await;
self.append_events_locked(
has_storage,
&mut state,
timeline,
ephemeral_events,
ambiguity_changes,
)
.await?;
}
Ok(())
}
#[instrument(skip_all, fields(room_id = %self.room_id))]
pub(super) async fn handle_left_room_update(
&self,
has_storage: bool,
updates: LeftRoomUpdate,
) -> Result<()> {
self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes)
.await?;
Ok(())
}
/// Remove existing events, and append a set of events to the room cache and
/// storage, notifying observers.
pub(super) async fn replace_all_events_by(
&self,
timeline_events: Vec<TimelineEvent>,
prev_batch: Option<String>,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
events_origin: EventsOrigin,
) -> Result<()> {
// Acquire the lock.
let mut state = self.state.write().await;
// Reset the room's state.
let updates_as_vector_diffs = state.reset().await?;
// Propagate to observers.
let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs: updates_as_vector_diffs,
origin: events_origin,
});
// Push the new events.
// This method is only used when we don't have storage, and
// it's conservative to consider that this new timeline is "limited",
// since we don't know if we have a gap or not.
let has_storage = false;
let limited = true;
self.append_events_locked(
has_storage,
&mut state,
Timeline { limited, prev_batch, events: timeline_events },
ephemeral_events,
ambiguity_changes,
)
.await?;
Ok(())
}
/// Append a set of events to the room cache and storage, notifying
/// observers.
///
/// This is a private implementation. It must not be exposed publicly.
async fn append_events_locked(
&self,
has_storage: bool,
state: &mut RoomEventCacheState,
timeline: Timeline,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
@@ -468,11 +371,18 @@ impl RoomEventCacheInner {
return Ok(());
}
// Ditch the previous-batch token if we have storage, the sync isn't limited and
// we've seen at least one event in the past. In this case (and only this one),
// we should definitely know what the head of the timeline is (either we
// know about all the events, or we have a gap somewhere).
if has_storage && !timeline.limited && state.events().events().next().is_some() {
// Add all the events to the backend.
trace!("adding new events");
let mut state = self.state.write().await;
// Ditch the previous-batch token if the sync isn't limited and we've seen at
// least one event in the past.
//
// In this case (and only this one), we should definitely know what the head of
// the timeline is (either we know about all the events, or we have a
// gap somewhere), since storage is enabled by default.
if !timeline.limited && state.events().events().next().is_some() {
prev_batch = None;
}
@@ -1577,7 +1487,6 @@ mod tests {
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
@@ -1638,7 +1547,6 @@ mod tests {
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
@@ -1690,7 +1598,6 @@ mod tests {
// Don't forget to subscribe and like^W enable storage!
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
@@ -1706,7 +1613,7 @@ mod tests {
room_event_cache
.inner
.handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
.handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
.await
.unwrap();
@@ -1758,7 +1665,6 @@ mod tests {
// Don't forget to subscribe and like^W enable storage!
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
@@ -1775,7 +1681,7 @@ mod tests {
room_event_cache
.inner
.handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
.handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
.await
.unwrap();
@@ -1893,7 +1799,6 @@ mod tests {
// Don't forget to subscribe and like^W enable storage!
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
@@ -2037,7 +1942,6 @@ mod tests {
// Don't forget to subscribe and like^W enable storage!
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
@@ -2073,7 +1977,7 @@ mod tests {
let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
room_event_cache
.inner
.handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
.handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
.await
.unwrap();
@@ -2135,7 +2039,6 @@ mod tests {
// Don't forget to subscribe and like^W enable storage!
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
@@ -2166,9 +2069,6 @@ mod tests {
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let has_storage = true; // for testing purposes only
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
@@ -2179,17 +2079,14 @@ mod tests {
// prev-batch token.
room_event_cache
.inner
.handle_joined_room_update(
has_storage,
JoinedRoomUpdate {
timeline: Timeline {
limited: true,
prev_batch: Some("raclette".to_owned()),
events: vec![f.text_msg("hey yo").into_event()],
},
..Default::default()
.handle_joined_room_update(JoinedRoomUpdate {
timeline: Timeline {
limited: true,
prev_batch: Some("raclette".to_owned()),
events: vec![f.text_msg("hey yo").into_event()],
},
)
..Default::default()
})
.await
.unwrap();
@@ -2235,17 +2132,14 @@ mod tests {
// this time.
room_event_cache
.inner
.handle_joined_room_update(
has_storage,
JoinedRoomUpdate {
timeline: Timeline {
limited: false,
prev_batch: Some("fondue".to_owned()),
events: vec![f.text_msg("sup").into_event()],
},
..Default::default()
.handle_joined_room_update(JoinedRoomUpdate {
timeline: Timeline {
limited: false,
prev_batch: Some("fondue".to_owned()),
events: vec![f.text_msg("sup").into_event()],
},
)
..Default::default()
})
.await
.unwrap();
@@ -2281,7 +2175,6 @@ mod tests {
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
@@ -2369,7 +2262,6 @@ mod tests {
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
@@ -2486,7 +2378,6 @@ mod tests {
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();

View File

@@ -32,7 +32,7 @@ use ruma::{
room_id, user_id, EventId, RoomVersionId,
};
use serde_json::json;
use tokio::{spawn, sync::broadcast, task::yield_now, time::sleep};
use tokio::{spawn, sync::broadcast, time::sleep};
macro_rules! assert_event_id {
($timeline_event:expr, $event_id:literal) => {
@@ -285,6 +285,9 @@ async fn test_backpaginate_once() {
// I'll get all the previous events, in "reverse" order (same as the response).
let BackPaginationOutcome { events, reached_start } = outcome;
// The event cache figures this is the last chunk of events in the room, because
// there's no prior gap this time.
assert!(reached_start);
assert_eq!(events.len(), 2);
@@ -307,6 +310,11 @@ async fn test_backpaginate_once() {
});
assert!(room_stream.is_empty());
// Another back-pagination doesn't return any new information.
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert!(outcome.events.is_empty());
assert!(outcome.reached_start);
}
#[async_test]
@@ -382,8 +390,9 @@ async fn test_backpaginate_many_times_with_many_iterations() {
}
}
// I'll get all the previous events,
assert_eq!(num_iterations, 2); // in two iterations
// I'll get all the previous events, in two iterations, one for each
// back-pagination.
assert_eq!(num_iterations, 2);
assert_event_matches_msg(&global_events[0], "world");
assert_event_matches_msg(&global_events[1], "hello");
@@ -446,7 +455,7 @@ async fn test_backpaginate_many_times_with_one_iteration() {
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
server
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
@@ -458,8 +467,7 @@ async fn test_backpaginate_many_times_with_one_iteration() {
)
.await;
let (room_event_cache, _drop_handles) =
client.get_room(room_id).unwrap().event_cache().await.unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
@@ -647,32 +655,26 @@ async fn test_reset_while_backpaginating() {
// Assert the updates as diffs.
{
// The room shrinks the linked chunk to the last event chunk, so it clears and
// re-adds the latest event.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
// The clear.
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Clear);
}
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
// The event from the sync.
assert_matches!(&diffs[0], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "heyo");
assert_matches!(&diffs[1], VectorDiff::Append { values } => {
assert_eq!(values.len(), 1);
assert_event_matches_msg(&values[0], "heyo");
});
}
{
// Then we receive the event from the restarted back-pagination with
// `second_backpagination`.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
// The event from the pagination.
assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 0);
assert_event_matches_msg(event, "finally!");
@@ -800,14 +802,21 @@ async fn test_limited_timeline_resets_pagination() {
);
// When a limited sync comes back from the server,
server.sync_room(&client, JoinedRoomBuilder::new(room_id).set_timeline_limited()).await;
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
// We receive an update about the limited timeline.
// We have a limited sync, which triggers a shrink to the latest chunk: a gap.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Clear = &diffs[0]);
assert_matches!(&diffs[0], VectorDiff::Clear);
// The paginator state is reset: status set to Idle, hasn't hit the timeline
// start.
@@ -819,72 +828,6 @@ async fn test_limited_timeline_resets_pagination() {
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_persistent_storage_waits_for_pagination_token() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
// TODO: remove this test when persistent storage is enabled by default, as it's
// doing the same as the one above.
event_cache.enable_storage().unwrap();
// If I sync and get informed I've joined The Room, without a previous batch
// token,
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
assert!(events.is_empty());
assert!(room_stream.is_empty());
server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("hi").event_id(event_id!("$2")).into_raw_timeline()]))
.mock_once()
.mount()
.await;
// At the beginning, the paginator is in the initial state.
let pagination = room_event_cache.pagination();
let mut pagination_status = pagination.status();
assert_eq!(pagination_status.get(), RoomPaginationStatus::Idle { hit_timeline_start: false });
// If we try to back-paginate with a token, it will hit the end of the timeline
// and give us the resulting event.
let BackPaginationOutcome { events, reached_start } =
pagination.run_backwards_once(20).await.unwrap();
assert_eq!(events.len(), 1);
assert!(reached_start);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "hi");
});
// And the paginator state delivers this as an update, and is internally
// consistent with it:
assert_next_matches_with_timeout!(
pagination_status,
RoomPaginationStatus::Idle { hit_timeline_start: true }
);
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_limited_timeline_with_storage() {
let server = MatrixMockServer::new().await;
@@ -894,7 +837,6 @@ async fn test_limited_timeline_with_storage() {
// Don't forget to subscribe and like^W enable storage!
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room_id = room_id!("!galette:saucisse.bzh");
let room = server.sync_joined_room(&client, room_id).await;
@@ -953,80 +895,6 @@ async fn test_limited_timeline_with_storage() {
assert!(subscriber.is_empty());
}
#[async_test]
async fn test_limited_timeline_without_storage() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!galette:saucisse.bzh");
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
// Get a sync for a non-limited timeline, but with a prev-batch token.
//
// When we don't have storage, we should still keep this prev-batch around,
// because the previous sync events may not have been saved to disk in the
// first place.
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hey yo"))
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
let (initial_events, mut subscriber) = room_event_cache.subscribe().await;
// This is racy: either the sync has been handled, or it hasn't yet.
if initial_events.is_empty() {
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values: events } = &diffs[0]);
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "hey yo");
} else {
assert_eq!(initial_events.len(), 1);
assert_event_matches_msg(&initial_events[0], "hey yo");
}
// Back-pagination should thus work. The real assertion is in the "mock_once"
// call, which checks this endpoint is called once.
server
.mock_room_messages()
.match_from("prev-batch")
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("oh well").event_id(event_id!("$1"))]))
.mock_once()
.mount()
.await;
// We run back-pagination with success.
room_event_cache.pagination().run_backwards_once(20).await.unwrap();
// And we get the back-paginated event.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Insert { index: 0, value: event } = &diffs[0]);
assert_event_matches_msg(event, "oh well");
// That's all, folks!
assert!(subscriber.is_empty());
}
#[async_test]
async fn test_backpaginate_with_no_initial_events() {
let server = MatrixMockServer::new().await;
@@ -1195,7 +1063,6 @@ async fn test_no_gap_stored_after_deduplicated_sync() {
// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
@@ -1299,7 +1166,6 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() {
// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
@@ -1424,7 +1290,6 @@ async fn test_dont_delete_gap_that_wasnt_inserted() {
// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
@@ -1488,7 +1353,6 @@ async fn test_apply_redaction_when_redaction_comes_later() {
// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
@@ -1611,7 +1475,6 @@ async fn test_apply_redaction_on_an_in_store_event() {
// Set up the event cache.
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room = mock_server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap();
@@ -1703,7 +1566,6 @@ async fn test_apply_redaction_when_redacted_and_redaction_are_in_same_sync() {
// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let room = server.sync_joined_room(&client, room_id).await;
@@ -1854,7 +1716,6 @@ async fn test_lazy_loading() {
// Set up the event cache.
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room = mock_server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap();
@@ -2194,7 +2055,6 @@ async fn test_deduplication() {
// Set up the event cache.
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room = mock_server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap();
@@ -2300,118 +2160,12 @@ async fn test_deduplication() {
}
}
#[async_test]
async fn test_timeline_then_empty_timeline_then_deduplication() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!galette:saucisse.bzh");
let room = server.sync_joined_room(&client, room_id).await;
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
// Previous batch of events which will be received via /messages, in
// chronological order.
let previous_events = [
f.text_msg("previous1").event_id(event_id!("$prev1")).into_raw_timeline(),
f.text_msg("previous2").event_id(event_id!("$prev2")).into_raw_timeline(),
f.text_msg("previous3").event_id(event_id!("$prev3")).into_raw_timeline(),
];
// Latest events which will be received via /sync, in chronological order.
let latest_events = [
f.text_msg("latest3").event_id(event_id!("$latest3")).into_raw_timeline(),
f.text_msg("latest2").event_id(event_id!("$latest2")).into_raw_timeline(),
f.text_msg("latest1").event_id(event_id!("$latest1")).into_raw_timeline(),
];
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (initial_events, mut subscriber) = room_event_cache.subscribe().await;
assert!(initial_events.is_empty());
// Receive a sync with only the latest events.
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_prev_batch("token-before-latest")
.add_timeline_bulk(latest_events.clone().into_iter().map(ruma::serde::Raw::cast)),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values } = &diffs[0]);
assert_eq!(values.len(), 3);
assert_event_matches_msg(&values[0], "latest3");
assert_event_matches_msg(&values[1], "latest2");
assert_event_matches_msg(&values[2], "latest1");
// Receive a timeline without any items.
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).set_timeline_prev_batch("token-after-latest"),
)
.await;
// Let the event cache handle the timeline result.
yield_now().await;
// No update happened.
assert!(subscriber.is_empty());
// Back-paginate.
let all_events = previous_events.into_iter().chain(latest_events).rev().collect::<Vec<_>>();
server
.mock_room_messages()
// The prev_batch from the second sync.
.match_from("token-after-latest")
.ok(RoomMessagesResponseTemplate::default().end_token("messages-end-2").events(all_events))
.named("messages-since-after-latest")
.mount()
.await;
room_event_cache.pagination().run_backwards_once(10).await.unwrap();
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 4);
// Yay.
assert_matches!(&diffs[0], VectorDiff::Remove { index: 2 });
assert_matches!(&diffs[1], VectorDiff::Remove { index: 1 });
assert_matches!(&diffs[2], VectorDiff::Remove { index: 0 });
assert_let!(VectorDiff::Append { values } = &diffs[3]);
assert_eq!(values.len(), 6);
assert_event_matches_msg(&values[0], "previous1");
assert_event_matches_msg(&values[1], "previous2");
assert_event_matches_msg(&values[2], "previous3");
assert_event_matches_msg(&values[3], "latest3");
assert_event_matches_msg(&values[4], "latest2");
assert_event_matches_msg(&values[5], "latest1");
// That's all, folks!
assert!(subscriber.is_empty());
}
#[async_test]
async fn test_timeline_then_empty_timeline_then_deduplication_with_storage() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
// TODO: remove the other test above, which doesn't enable storage, when
// persistent storage's enabled by default.
client.event_cache().enable_storage().unwrap();
let room_id = room_id!("!galette:saucisse.bzh");
let room = server.sync_joined_room(&client, room_id).await;
@@ -2520,7 +2274,6 @@ async fn test_dont_remove_only_gap() {
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
client.event_cache().enable_storage().unwrap();
let room_id = room_id!("!galette:saucisse.bzh");
let room = server
@@ -2581,7 +2334,6 @@ async fn test_clear_all_rooms() {
.await;
client.event_cache().subscribe().unwrap();
client.event_cache().enable_storage().unwrap();
// Another room gets a live event: it's loaded in the event cache now, while
// sleeping_room_id is not.
@@ -2669,7 +2421,6 @@ async fn test_sync_while_back_paginate() {
let room = client.get_room(room_id).unwrap();
client.event_cache().subscribe().unwrap();
client.event_cache().enable_storage().unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (initial_events, mut subscriber) = room_event_cache.subscribe().await;
@@ -2721,7 +2472,7 @@ async fn test_sync_while_back_paginate() {
assert_event_matches_msg(&values[1], "sync2");
assert_event_matches_msg(&values[2], "sync3");
// Back pagination should succeed, and we don't have reached the start.
// Back pagination should succeed, and we haven't reached the start.
let outcome = back_pagination_handle.await.unwrap();
assert!(outcome.reached_start.not());
assert_eq!(outcome.events.len(), 3);

View File

@@ -612,7 +612,6 @@ async fn test_event() {
let cache = client.event_cache();
let _ = cache.subscribe();
cache.enable_storage().unwrap();
let room = server
.sync_room(
@@ -674,9 +673,7 @@ async fn test_event_with_context() {
let next_event_id = event_id!("$next_1234");
let (client, server) = logged_in_client_with_server().await;
let cache = client.event_cache();
let _ = cache.subscribe();
cache.enable_storage().unwrap();
client.event_cache().subscribe().unwrap();
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));

View File

@@ -27,7 +27,6 @@ async fn test_forget_non_direct_room() {
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/forget$"))
@@ -84,7 +83,6 @@ async fn test_forget_banned_room() {
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/forget$"))

View File

@@ -112,7 +112,6 @@ async fn main() -> Result<()> {
let event_cache = client.event_cache();
event_cache.subscribe()?;
event_cache.enable_storage()?;
let terminal = ratatui::init();
execute!(stdout(), EnableMouseCapture)?;