feat(event cache): don't add a previous gap if all events were deduplicated, after sync

This commit is contained in:
Benjamin Bouvier
2024-12-17 16:15:29 +01:00
parent 3f0712010f
commit bcb9a86a00
2 changed files with 102 additions and 2 deletions

View File

@@ -36,7 +36,7 @@ use tokio::sync::{
broadcast::{Receiver, Sender},
Notify, RwLock, RwLockReadGuard, RwLockWriteGuard,
};
use tracing::{trace, warn};
use tracing::{debug, trace, warn};
use super::{
paginator::{Paginator, PaginatorState},
@@ -277,6 +277,10 @@ impl RoomEventCacheInner {
}
fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
if account_data.is_empty() {
return;
}
let mut handled_read_marker = false;
trace!("Handling account data");
@@ -536,7 +540,22 @@ impl RoomEventCacheInner {
room_events.push_gap(Gap { prev_token: prev_token.clone() });
}
room_events.push_events(sync_timeline_events.clone());
let add_event_report = room_events.push_events(sync_timeline_events.clone());
if add_event_report.deduplicated_all_new_events() {
debug!(
"not storing previous batch token, because we deduplicated all new sync events"
);
// Remove the gap we just inserted.
let prev_gap_id = room_events
.rchunks()
.find_map(|c| c.is_gap().then_some(c.identifier()))
.expect("we just inserted the gap beforehand");
room_events
.replace_gap_at([], prev_gap_id)
.expect("we obtained the valid position beforehand");
}
})
.await?;

View File

@@ -1019,3 +1019,84 @@ async fn test_backpaginate_replace_empty_gap() {
assert_event_matches_msg(&events[1], "world");
assert_eq!(events.len(), 2);
}
#[async_test]
async fn test_no_gap_stored_after_deduplicated_sync() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let initial_events = vec![
f.text_msg("hello").event_id(event_id!("$1")).into_raw_sync(),
f.text_msg("world").event_id(event_id!("$2")).into_raw_sync(),
f.text_msg("sup").event_id(event_id!("$3")).into_raw_sync(),
];
// Start with a room with a few events, limited timeline and prev-batch token.
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_bulk(initial_events.clone())
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
if events.is_empty() {
let update = stream.recv().await.expect("read error");
assert_matches!(update, RoomEventCacheUpdate::AddTimelineEvents { .. });
}
drop(events);
// Backpagination will return nothing.
server
.mock_room_messages()
.ok("start-token-unused1".to_owned(), None, Vec::<Raw<AnyTimelineEvent>>::new(), Vec::new())
.mock_once()
.mount()
.await;
let pagination = room_event_cache.pagination();
// Run pagination once: it will consume the unique gap we had.
pagination.run_backwards(20, once).await.unwrap();
// Now simulate that the sync returns the same events (which can happen with
// simplified sliding sync).
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_bulk(initial_events)
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
let update = stream.recv().await.expect("read error");
assert_matches!(update, RoomEventCacheUpdate::AddTimelineEvents { .. });
// If this back-pagination fails, that's because we've stored a gap that's
// useless. It should be short-circuited because there's no previous gap.
let outcome = pagination.run_backwards(20, once).await.unwrap();
assert!(outcome.reached_start);
let (events, _stream) = room_event_cache.subscribe().await.unwrap();
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_event_matches_msg(&events[2], "sup");
assert_eq!(events.len(), 3);
}