feat(sdk): EventCache fully uses RoomEvents/LinkedChunk

feat(sdk): `EventCache` fully uses `RoomEvents`/`LinkedChunk`
This commit is contained in:
Ivan Enderlin
2024-03-27 10:23:42 +01:00
committed by GitHub
2 changed files with 367 additions and 433 deletions

View File

@@ -62,16 +62,17 @@ use ruma::{
use tokio::{
sync::{
broadcast::{error::RecvError, Receiver, Sender},
Mutex, Notify, RwLock,
Mutex, Notify, RwLock, RwLockReadGuard, RwLockWriteGuard,
},
time::timeout,
};
use tracing::{error, instrument, trace, warn};
use self::store::{EventCacheStore, MemoryStore, TimelineEntry};
use crate::{
client::ClientInner, event_cache::store::PaginationToken, room::MessagesOptions, Client, Room,
use self::{
linked_chunk::ChunkContent,
store::{Gap, PaginationToken, RoomEvents},
};
use crate::{client::ClientInner, room::MessagesOptions, Client, Room};
mod linked_chunk;
mod store;
@@ -149,15 +150,14 @@ impl Debug for EventCache {
impl EventCache {
/// Create a new [`EventCache`] for the given client.
pub(crate) fn new(client: &Arc<ClientInner>) -> Self {
let store = Arc::new(MemoryStore::new());
let inner = Arc::new(EventCacheInner {
client: Arc::downgrade(client),
by_room: Default::default(),
store: Arc::new(Mutex::new(store)),
drop_handles: Default::default(),
});
Self { inner }
Self {
inner: Arc::new(EventCacheInner {
client: Arc::downgrade(client),
multiple_room_updates_lock: Default::default(),
by_room: Default::default(),
drop_handles: Default::default(),
}),
}
}
/// Starts subscribing the [`EventCache`] to sync responses, if not done
@@ -205,14 +205,7 @@ impl EventCache {
// Forget everything we know; we could have missed events, and we have
// no way to reconcile at the moment!
// TODO: implement Smart Matching™,
let store = inner.store.lock().await;
let mut by_room = inner.by_room.write().await;
for room_id in by_room.keys() {
if let Err(err) = store.clear_room(room_id).await {
error!("unable to clear room after room updates lag: {err}");
}
}
by_room.clear();
inner.by_room.write().await.clear();
}
Err(RecvError::Closed) => {
@@ -256,15 +249,10 @@ impl EventCache {
// We could have received events during a previous sync; remove them all, since
// we can't know where to insert the "initial events" with respect to
// them.
let store = self.inner.store.lock().await;
store.clear_room(room_id).await?;
let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear);
room_cache
.inner
.append_events(
&**store,
.replace_all_events_by(
events,
prev_batch,
Default::default(),
@@ -282,17 +270,16 @@ struct EventCacheInner {
/// on the owning client.
client: Weak<ClientInner>,
/// Lazily-filled cache of live [`RoomEventCache`], once per room.
by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
/// Backend used for storage.
/// A lock used when many rooms must be updated at once.
///
/// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
/// ensure that multiple updates will be applied in the correct order, which
/// is enforced by taking the store lock when handling an update.
///
/// TODO: replace with a cross-process lock
store: Arc<Mutex<Arc<dyn EventCacheStore>>>,
/// is enforced by taking this lock when handling an update.
// TODO: that's the place to add a cross-process lock!
multiple_room_updates_lock: Mutex<()>,
/// Lazily-filled cache of live [`RoomEventCache`], once per room.
by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
/// Handles to keep alive the task listening to updates.
drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
@@ -308,7 +295,7 @@ impl EventCacheInner {
async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
// First, take the lock that indicates we're processing updates, to avoid
// handling multiple updates concurrently.
let store = self.store.lock().await;
let _lock = self.multiple_room_updates_lock.lock().await;
// Left rooms.
for (room_id, left_room_update) in updates.leave {
@@ -317,7 +304,7 @@ impl EventCacheInner {
continue;
};
if let Err(err) = room.inner.handle_left_room_update(&**store, left_room_update).await {
if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
// Non-fatal error, try to continue to the next room.
error!("handling left room update: {err}");
}
@@ -330,9 +317,7 @@ impl EventCacheInner {
continue;
};
if let Err(err) =
room.inner.handle_joined_room_update(&**store, joined_room_update).await
{
if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
// Non-fatal error, try to continue to the next room.
error!("handling joined room update: {err}");
}
@@ -371,7 +356,7 @@ impl EventCacheInner {
return Ok(None);
};
let room_event_cache = RoomEventCache::new(room, self.store.clone());
let room_event_cache = RoomEventCache::new(room);
by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
@@ -397,8 +382,8 @@ impl Debug for RoomEventCache {
impl RoomEventCache {
/// Create a new [`RoomEventCache`] using the given room and store.
fn new(room: Room, store: Arc<Mutex<Arc<dyn EventCacheStore>>>) -> Self {
Self { inner: Arc::new(RoomEventCacheInner::new(room, store)) }
fn new(room: Room) -> Self {
Self { inner: Arc::new(RoomEventCacheInner::new(room)) }
}
/// Subscribe to room updates for this room, after getting the initial list
@@ -408,9 +393,10 @@ impl RoomEventCache {
pub async fn subscribe(
&self,
) -> Result<(Vec<SyncTimelineEvent>, Receiver<RoomEventCacheUpdate>)> {
let store = self.inner.store.lock().await;
let events =
self.inner.events.read().await.events().map(|(_position, item)| item.clone()).collect();
Ok((store.room_events(self.inner.room.room_id()).await?, self.inner.sender.subscribe()))
Ok((events, self.inner.sender.subscribe()))
}
/// Returns the oldest back-pagination token, that is, the one closest to
@@ -447,14 +433,12 @@ struct RoomEventCacheInner {
/// Sender part for subscribers to this room.
sender: Sender<RoomEventCacheUpdate>,
/// Backend used for storage, shared with the parent [`EventCacheInner`].
///
/// See comment there.
store: Arc<Mutex<Arc<dyn EventCacheStore>>>,
/// The Client [`Room`] this event cache pertains to.
room: Room,
/// The events of the room.
events: RwLock<RoomEvents>,
/// A notifier that we received a new pagination token.
pagination_token_notifier: Notify,
@@ -466,24 +450,20 @@ struct RoomEventCacheInner {
impl RoomEventCacheInner {
/// Creates a new cache for a room, and subscribes to room updates, so as
/// to handle new timeline events.
fn new(room: Room, store: Arc<Mutex<Arc<dyn EventCacheStore>>>) -> Self {
fn new(room: Room) -> Self {
let sender = Sender::new(32);
Self {
room,
store,
events: RwLock::new(RoomEvents::default()),
sender,
pagination_lock: Default::default(),
pagination_token_notifier: Default::default(),
}
}
async fn handle_joined_room_update(
&self,
store: &dyn EventCacheStore,
updates: JoinedRoomUpdate,
) -> Result<()> {
async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
self.handle_timeline(
store,
updates.timeline,
updates.ephemeral.clone(),
updates.account_data,
@@ -495,7 +475,6 @@ impl RoomEventCacheInner {
async fn handle_timeline(
&self,
store: &dyn EventCacheStore,
timeline: Timeline,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
@@ -505,51 +484,99 @@ impl RoomEventCacheInner {
// Ideally we'd try to reconcile existing events against those received in the
// timeline, but we're not there yet. In the meanwhile, clear the
// items from the room. TODO: implement Smart Matching™.
trace!("limited timeline, clearing all previous events");
trace!("limited timeline, clearing all previous events and pushing new events");
// Clear internal state (events, pagination tokens, etc.).
store.clear_room(self.room.room_id()).await?;
self.replace_all_events_by(
timeline.events,
timeline.prev_batch,
account_data,
ephemeral,
ambiguity_changes,
)
.await?;
} else {
// Add all the events to the backend.
trace!("adding new events");
// Propagate to observers.
let _ = self.sender.send(RoomEventCacheUpdate::Clear);
self.append_new_events(
timeline.events,
timeline.prev_batch,
account_data,
ephemeral,
ambiguity_changes,
)
.await?;
}
// Add all the events to the backend.
trace!("adding new events");
self.append_events(
store,
timeline.events,
timeline.prev_batch,
Ok(())
}
async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
self.handle_timeline(updates.timeline, Vec::new(), Vec::new(), updates.ambiguity_changes)
.await?;
Ok(())
}
/// Remove existing events, and append a set of events to the room cache and
/// storage, notifying observers.
async fn replace_all_events_by(
&self,
events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
// Acquire the lock.
let mut room_events = self.events.write().await;
// Reset the events.
room_events.reset();
// Propagate to observers.
let _ = self.sender.send(RoomEventCacheUpdate::Clear);
// Push the new events.
self.append_events_locked_impl(
room_events,
events,
prev_batch,
account_data,
ephemeral,
ambiguity_changes,
)
.await?;
Ok(())
}
async fn handle_left_room_update(
&self,
store: &dyn EventCacheStore,
updates: LeftRoomUpdate,
) -> Result<()> {
self.handle_timeline(
store,
updates.timeline,
Vec::new(),
Vec::new(),
updates.ambiguity_changes,
)
.await?;
Ok(())
.await
}
/// Append a set of events to the room cache and storage, notifying
/// observers.
async fn append_events(
async fn append_new_events(
&self,
store: &dyn EventCacheStore,
events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
self.append_events_locked_impl(
self.events.write().await,
events,
prev_batch,
account_data,
ephemeral,
ambiguity_changes,
)
.await
}
/// Append a set of events, with an attached lock.
///
/// If the lock `room_events` is `None`, one will be created.
///
/// This is a private implementation. It must not be exposed publicly.
async fn append_events_locked_impl(
&self,
mut room_events: RwLockWriteGuard<'_, RoomEvents>,
events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
@@ -565,22 +592,18 @@ impl RoomEventCacheInner {
return Ok(());
}
let room_id = self.room.room_id();
// Add the previous back-pagination token (if present), followed by the timeline
// events themselves.
let gap_with_token = prev_batch
.clone()
.map(|val| TimelineEntry::Gap { prev_token: PaginationToken(val) })
.into_iter();
{
if let Some(prev_token) = &prev_batch {
room_events.push_gap(Gap { prev_token: PaginationToken(prev_token.clone()) });
}
store
.append_room_entries(
room_id,
gap_with_token.chain(events.iter().cloned().map(TimelineEntry::Event)).collect(),
)
.await?;
room_events.push_events(events.clone().into_iter());
}
// Now that all events have been added, we can trigger the
// `pagination_token_notifier`.
if prev_batch.is_some() {
self.pagination_token_notifier.notify_one();
}
@@ -610,13 +633,7 @@ impl RoomEventCacheInner {
// Make sure there's at most one back-pagination request.
let _guard = self.pagination_lock.lock().await;
if let Some(token) = token.as_ref() {
let store = self.store.lock().await;
if !store.contains_gap(self.room.room_id(), token).await? {
return Err(EventCacheError::UnknownBackpaginationToken);
}
}
// Get messages.
let messages = self
.room
.messages(assign!(MessagesOptions::backward(), {
@@ -626,10 +643,32 @@ impl RoomEventCacheInner {
.await
.map_err(EventCacheError::SdkError)?;
// Make sure the `RoomEvents` isn't updated while we are saving events from
// backpagination.
let mut room_events = self.events.write().await;
// Check that the `token` exists if any.
let gap_identifier = if let Some(token) = token.as_ref() {
let gap_identifier = room_events.chunk_identifier(|chunk| {
matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if prev_token == token)
});
// The method has been called with `token` but it doesn't exist in `RoomEvents`,
// it's an error.
if gap_identifier.is_none() {
return Ok(BackPaginationOutcome::UnknownBackpaginationToken);
}
gap_identifier
} else {
None
};
// Would we want to backpaginate again, we'd start from the `end` token as the
// next `from` token.
let prev_token = messages.end;
let prev_token =
messages.end.map(|prev_token| Gap { prev_token: PaginationToken(prev_token) });
// If this token is missing, then we've reached the end of the timeline.
let reached_start = prev_token.is_none();
@@ -640,41 +679,78 @@ impl RoomEventCacheInner {
// should be prepended first).
let events = messages.chunk;
// Prepend the previous token (if any) at the beginning of the timeline,
// followed by the events received in the response (in reverse order).
let new_gap = prev_token
.map(|token| TimelineEntry::Gap { prev_token: PaginationToken(token) })
.into_iter();
let sync_events = events
.iter()
// Reverse the order of the events as `/messages` has been called with `dir=b`
// (backward). The `RoomEvents` API expects the first event to be the oldest.
.rev()
.cloned()
.map(SyncTimelineEvent::from);
// For storage, reverse events to store them in the normal (non-reversed order).
//
// It's fine to convert from `TimelineEvent` (i.e. that has a room id) to
// `SyncTimelineEvent` (i.e. that doesn't have it), because those events are
// always tied to a room in storage anyways.
let new_events = events.iter().rev().map(|ev| TimelineEntry::Event(ev.clone().into()));
// There is a `token`/gap, let's replace it by new events!
if let Some(gap_identifier) = gap_identifier {
let new_position = {
// Replace the gap by new events.
let new_chunk = room_events
.replace_gap_at(sync_events, gap_identifier)
// SAFETY: we are sure that `gap_identifier` represents a valid
// `ChunkIdentifier` for a `Gap` chunk.
.expect("The `gap_identifier` must represent a `Gap`");
let replaced = self
.store
.lock()
.await
.replace_gap(self.room.room_id(), token.as_ref(), new_gap.chain(new_events).collect())
.await?;
new_chunk.first_position()
};
// And insert a new gap if there is any `prev_token`.
if let Some(prev_token_gap) = prev_token {
room_events
.insert_gap_at(prev_token_gap, new_position)
// SAFETY: we are sure that `new_position` represents a valid
// `ChunkIdentifier` for an `Item` chunk.
.expect("The `new_position` must represent an `Item`");
}
if !replaced {
// The previous token disappeared!
// This can happen if we got a limited timeline and lost track of our pagination
// token, because the whole timeline has been reset.
//
// TODO: With smarter reconciliation, this might get away. In the meanwhile,
// early return and forget about all the events.
trace!("gap was missing, likely because we observed a gappy sync response");
Ok(BackPaginationOutcome::UnknownBackpaginationToken)
} else {
trace!("replaced gap with new events from backpagination");
// TODO: implement smarter reconciliation later
//let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events });
Ok(BackPaginationOutcome::Success { events, reached_start })
} else {
// There is no `token`/gap identifier. Let's assume we must prepend the new
// events.
let first_item_position =
room_events.events().next().map(|(item_position, _)| item_position);
match first_item_position {
// Is there a first item? Insert at this position.
Some(first_item_position) => {
if let Some(prev_token_gap) = prev_token {
room_events
.insert_gap_at(prev_token_gap, first_item_position)
// SAFETY: The `first_item_position` can only be an `Item` chunk, it's
// an invariant of `LinkedChunk`. Also, it can only represent a valid
// `ChunkIdentifier` as the data structure isn't modified yet.
.expect("The `first_item_position` must represent a valid `Item`");
}
room_events
.insert_events_at(sync_events, first_item_position)
// SAFETY: The `first_item_position` can only be an `Item` chunk, it's
// an invariant of `LinkedChunk`. The chunk it points to has not been
// removed.
.expect("The `first_item_position` must represent an `Item`");
}
// There is no first item. Let's simply push.
None => {
if let Some(prev_token_gap) = prev_token {
room_events.push_gap(prev_token_gap);
}
room_events.push_events(sync_events);
}
}
Ok(BackPaginationOutcome::Success { events, reached_start })
}
}
@@ -689,14 +765,19 @@ impl RoomEventCacheInner {
max_wait: Option<Duration>,
) -> Result<Option<PaginationToken>> {
// Optimistically try to return the backpagination token immediately.
if let Some(token) =
self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await?
{
fn get_oldest(room_events: RwLockReadGuard<'_, RoomEvents>) -> Option<PaginationToken> {
room_events.chunks().find_map(|chunk| match chunk.content() {
ChunkContent::Gap(gap) => Some(gap.prev_token.clone()),
ChunkContent::Items(..) => None,
})
}
if let Some(token) = get_oldest(self.events.read().await) {
return Ok(Some(token));
}
let Some(max_wait) = max_wait else {
// We had no token and no time to wait, so... no tokens.
// We had no token and no time to wait, so no tokens.
return Ok(None);
};
@@ -704,7 +785,7 @@ impl RoomEventCacheInner {
// Timeouts are fine, per this function's contract.
let _ = timeout(max_wait, self.pagination_token_notifier.notified()).await;
self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await
Ok(get_oldest(self.events.read().await))
}
}
@@ -758,13 +839,12 @@ pub enum RoomEventCacheUpdate {
#[cfg(test)]
mod tests {
use assert_matches2::assert_matches;
use matrix_sdk_common::executor::spawn;
use matrix_sdk_test::{async_test, sync_timeline_event};
use ruma::room_id;
use super::{store::TimelineEntry, EventCacheError};
use super::{BackPaginationOutcome, EventCacheError};
use crate::{event_cache::store::PaginationToken, test_utils::logged_in_client};
#[async_test]
@@ -783,70 +863,84 @@ mod tests {
assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
}
#[async_test]
async fn test_unknown_pagination_token() {
let client = logged_in_client(None).await;
let room_id = room_id!("!galette:saucisse.bzh");
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
client.event_cache().subscribe().unwrap();
let (room_event_cache, _drop_handles) =
client.event_cache().for_room(room_id).await.unwrap();
let room_event_cache = room_event_cache.unwrap();
// If I try to back-paginate with an unknown back-pagination token,
let token = PaginationToken("old".to_owned());
// Then I run into an error.
let res = room_event_cache.backpaginate(20, Some(token)).await;
assert_matches!(res.unwrap_err(), EventCacheError::UnknownBackpaginationToken);
}
// Those tests require time to work, and it does not on wasm32.
#[cfg(not(target_arch = "wasm32"))]
mod time_tests {
use std::time::{Duration, Instant};
use matrix_sdk_base::RoomState;
use serde_json::json;
use tokio::time::sleep;
use wiremock::{
matchers::{header, method, path_regex, query_param},
Mock, ResponseTemplate,
};
use super::*;
use super::{super::store::Gap, *};
use crate::test_utils::logged_in_client_with_server;
#[async_test]
async fn test_wait_no_pagination_token() {
let client = logged_in_client(None).await;
async fn test_unknown_pagination_token() {
let (client, server) = logged_in_client_with_server().await;
let room_id = room_id!("!galette:saucisse.bzh");
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
client.event_cache().subscribe().unwrap();
// When I only have events in a room,
client
.event_cache()
.inner
.store
.lock()
.await
.append_room_entries(
room_id,
vec![TimelineEntry::Event(
sync_timeline_event!({
"sender": "b@z.h",
"type": "m.room.message",
"event_id": "$ida",
"origin_server_ts": 12344446,
"content": { "body":"yolo", "msgtype": "m.text" },
})
.into(),
)],
)
.await
.unwrap();
let (room_event_cache, _drop_handlers) =
let (room_event_cache, _drop_handles) =
client.event_cache().for_room(room_id).await.unwrap();
let room_event_cache = room_event_cache.unwrap();
// If I try to back-paginate with an unknown back-pagination token,
let token_name = "unknown";
let token = PaginationToken(token_name.to_owned());
// Then I run into an error.
Mock::given(method("GET"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$"))
.and(header("authorization", "Bearer 1234"))
.and(query_param("from", token_name))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"start": token_name,
"chunk": [],
})))
.expect(1)
.mount(&server)
.await;
let res = room_event_cache.backpaginate(20, Some(token)).await;
assert_matches!(res, Ok(BackPaginationOutcome::UnknownBackpaginationToken));
server.verify().await
}
#[async_test]
async fn test_wait_no_pagination_token() {
let client = logged_in_client(None).await;
let room_id = room_id!("!galette:saucisse.bzh");
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
let room_event_cache = room_event_cache.unwrap();
// When I only have events in a room,
{
let mut room_events = room_event_cache.inner.events.write().await;
room_events.push_events([sync_timeline_event!({
"sender": "b@z.h",
"type": "m.room.message",
"event_id": "$ida",
"origin_server_ts": 12344446,
"content": { "body":"yolo", "msgtype": "m.text" },
})
.into()]);
}
// If I don't wait for the backpagination token,
let found = room_event_cache.oldest_backpagination_token(None).await.unwrap();
// Then I don't find it.
@@ -884,39 +978,28 @@ mod tests {
let room_id = room_id!("!galette:saucisse.bzh");
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
client.event_cache().subscribe().unwrap();
let event_cache = client.event_cache();
let (room_event_cache, _drop_handles) =
client.event_cache().for_room(room_id).await.unwrap();
event_cache.subscribe().unwrap();
let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
let room_event_cache = room_event_cache.unwrap();
let expected_token = PaginationToken("old".to_owned());
// When I have events and multiple gaps, in a room,
client
.event_cache()
.inner
.store
.lock()
.await
.append_room_entries(
room_id,
vec![
TimelineEntry::Gap { prev_token: expected_token.clone() },
TimelineEntry::Event(
sync_timeline_event!({
"sender": "b@z.h",
"type": "m.room.message",
"event_id": "$ida",
"origin_server_ts": 12344446,
"content": { "body":"yolo", "msgtype": "m.text" },
})
.into(),
),
],
)
.await
.unwrap();
{
let mut room_events = room_event_cache.inner.events.write().await;
room_events.push_gap(Gap { prev_token: expected_token.clone() });
room_events.push_events([sync_timeline_event!({
"sender": "b@z.h",
"type": "m.room.message",
"event_id": "$ida",
"origin_server_ts": 12344446,
"content": { "body":"yolo", "msgtype": "m.text" },
})
.into()]);
}
// If I don't wait for a back-pagination token,
let found = room_event_cache.oldest_backpagination_token(None).await.unwrap();
@@ -954,32 +1037,26 @@ mod tests {
let room_id = room_id!("!galette:saucisse.bzh");
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
client.event_cache().subscribe().unwrap();
let event_cache = client.event_cache();
let (room_event_cache, _drop_handles) =
client.event_cache().for_room(room_id).await.unwrap();
event_cache.subscribe().unwrap();
let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
let room_event_cache = room_event_cache.unwrap();
let expected_token = PaginationToken("old".to_owned());
let before = Instant::now();
let cloned_expected_token = expected_token.clone();
let cloned_room_event_cache = room_event_cache.clone();
let insert_token_task = spawn(async move {
// If a backpagination token is inserted after 400 milliseconds,
sleep(Duration::from_millis(400)).await;
client
.event_cache()
.inner
.store
.lock()
.await
.append_room_entries(
room_id,
vec![TimelineEntry::Gap { prev_token: cloned_expected_token }],
)
.await
.unwrap();
{
let mut room_events = cloned_room_event_cache.inner.events.write().await;
room_events.push_gap(Gap { prev_token: cloned_expected_token });
}
});
// Then first I don't get it (if I'm not waiting,)

View File

@@ -12,199 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::BTreeMap, fmt, iter::once, result::Result as StdResult};
use std::{fmt, iter::once};
use async_trait::async_trait;
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
use ruma::{OwnedRoomId, RoomId};
use tokio::sync::RwLock;
use super::{
linked_chunk::{
Chunk, ChunkIdentifier, LinkedChunk, LinkedChunkError, LinkedChunkIter,
LinkedChunkIterBackward, Position,
},
Result,
use super::linked_chunk::{
Chunk, ChunkIdentifier, LinkedChunk, LinkedChunkError, LinkedChunkIter,
LinkedChunkIterBackward, Position,
};
/// A store that can be remember information about the event cache.
///
/// It really acts as a cache, in the sense that clearing the backing data
/// should not have any irremediable effect, other than providing a lesser user
/// experience.
#[async_trait]
pub trait EventCacheStore: Send + Sync {
/// Returns all the known events for the given room.
async fn room_events(&self, room: &RoomId) -> Result<Vec<SyncTimelineEvent>>;
/// Adds all the entries to the given room's timeline.
async fn append_room_entries(&self, room: &RoomId, entries: Vec<TimelineEntry>) -> Result<()>;
/// Returns whether the store knows about the given pagination token.
async fn contains_gap(&self, room: &RoomId, pagination_token: &PaginationToken)
-> Result<bool>;
/// Replaces a given gap (identified by its pagination token) with the given
/// entries.
///
/// Note: if the gap hasn't been found, then nothing happens, and the events
/// are lost.
///
/// Returns whether the gap was found.
async fn replace_gap(
&self,
room: &RoomId,
gap_id: Option<&PaginationToken>,
entries: Vec<TimelineEntry>,
) -> Result<bool>;
/// Retrieve the oldest backpagination token for the given room.
async fn oldest_backpagination_token(&self, room: &RoomId) -> Result<Option<PaginationToken>>;
/// Clear all the information tied to a given room.
///
/// This forgets the following:
/// - events in the room
/// - pagination tokens
async fn clear_room(&self, room: &RoomId) -> Result<()>;
}
/// A newtype wrapper for a pagination token returned by a /messages response.
#[derive(Clone, Debug, PartialEq)]
pub struct PaginationToken(pub String);
#[derive(Clone)]
pub enum TimelineEntry {
Event(SyncTimelineEvent),
Gap {
/// The token to use in the query, extracted from a previous "from" /
/// "end" field of a `/messages` response.
prev_token: PaginationToken,
},
}
/// All the information related to a room and stored in the event cache.
#[derive(Default)]
struct RoomInfo {
/// All the timeline entries per room, in sync order.
entries: Vec<TimelineEntry>,
}
impl RoomInfo {
fn clear(&mut self) {
self.entries.clear();
}
}
/// An [`EventCacheStore`] implementation that keeps all the information in
/// memory.
#[derive(Default)]
pub(crate) struct MemoryStore {
by_room: RwLock<BTreeMap<OwnedRoomId, RoomInfo>>,
}
impl MemoryStore {
/// Create a new empty [`MemoryStore`].
pub fn new() -> Self {
Default::default()
}
}
#[async_trait]
impl EventCacheStore for MemoryStore {
async fn room_events(&self, room: &RoomId) -> Result<Vec<SyncTimelineEvent>> {
Ok(self
.by_room
.read()
.await
.get(room)
.map(|room_info| {
room_info
.entries
.iter()
.filter_map(
|entry| if let TimelineEntry::Event(ev) = entry { Some(ev) } else { None },
)
.cloned()
.collect()
})
.unwrap_or_default())
}
async fn append_room_entries(&self, room: &RoomId, entries: Vec<TimelineEntry>) -> Result<()> {
self.by_room.write().await.entry(room.to_owned()).or_default().entries.extend(entries);
Ok(())
}
async fn clear_room(&self, room: &RoomId) -> Result<()> {
// Clear the room, so as to avoid reallocations if the room is being reused.
// XXX: do we also want an actual way to *remove* a room? (for left rooms)
if let Some(room) = self.by_room.write().await.get_mut(room) {
room.clear();
}
Ok(())
}
async fn oldest_backpagination_token(&self, room: &RoomId) -> Result<Option<PaginationToken>> {
Ok(self.by_room.read().await.get(room).and_then(|room| {
room.entries.iter().find_map(|entry| {
if let TimelineEntry::Gap { prev_token: backpagination_token } = entry {
Some(backpagination_token.clone())
} else {
None
}
})
}))
}
async fn contains_gap(&self, room: &RoomId, needle: &PaginationToken) -> Result<bool> {
let mut by_room_guard = self.by_room.write().await;
let room = by_room_guard.entry(room.to_owned()).or_default();
Ok(room.entries.iter().any(|entry| {
if let TimelineEntry::Gap { prev_token: existing } = entry {
existing == needle
} else {
false
}
}))
}
async fn replace_gap(
&self,
room: &RoomId,
token: Option<&PaginationToken>,
entries: Vec<TimelineEntry>,
) -> Result<bool> {
let mut by_room_guard = self.by_room.write().await;
let room = by_room_guard.entry(room.to_owned()).or_default();
if let Some(token) = token {
let gap_pos = room.entries.iter().enumerate().find_map(|(i, t)| {
if let TimelineEntry::Gap { prev_token: existing } = t {
if existing == token {
return Some(i);
}
}
None
});
if let Some(pos) = gap_pos {
room.entries.splice(pos..pos + 1, entries);
Ok(true)
} else {
Ok(false)
}
} else {
// We had no previous token: assume we can prepend the events.
room.entries.splice(0..0, entries);
Ok(true)
}
}
}
#[derive(Debug)]
pub struct Gap {
/// The token to use in the query, extracted from a previous "from" /
@@ -230,6 +50,11 @@ impl RoomEvents {
Self { chunks: LinkedChunk::new() }
}
/// Clear all events.
pub fn reset(&mut self) {
self.chunks = LinkedChunk::new();
}
/// Return the number of events.
pub fn len(&self) -> usize {
self.chunks.len()
@@ -240,7 +65,7 @@ impl RoomEvents {
self.push_events(once(event))
}
/// Push events after existing events.
/// Push events after all events or gaps.
///
/// The last event in `events` is the most recent one.
pub fn push_events<I>(&mut self, events: I)
@@ -251,12 +76,17 @@ impl RoomEvents {
self.chunks.push_items_back(events)
}
/// Push a gap after all events or gaps.
pub fn push_gap(&mut self, gap: Gap) {
self.chunks.push_gap_back(gap)
}
/// Insert events at a specified position.
pub fn insert_events_at<I>(
&mut self,
events: I,
position: Position,
) -> StdResult<(), LinkedChunkError>
) -> Result<(), LinkedChunkError>
where
I: IntoIterator<Item = SyncTimelineEvent>,
I::IntoIter: ExactSizeIterator,
@@ -265,14 +95,29 @@ impl RoomEvents {
}
/// Insert a gap at a specified position.
pub fn insert_gap_at(
&mut self,
gap: Gap,
position: Position,
) -> StdResult<(), LinkedChunkError> {
pub fn insert_gap_at(&mut self, gap: Gap, position: Position) -> Result<(), LinkedChunkError> {
self.chunks.insert_gap_at(gap, position)
}
/// Replace the gap identified by `gap_identifier`, by events.
///
/// Because the `gap_identifier` can represent non-gap chunk, this method
/// returns a `Result`.
///
/// This method returns a reference to the (first if many) newly created
/// `Chunk` that contains the `items`.
pub fn replace_gap_at<I>(
&mut self,
events: I,
gap_identifier: ChunkIdentifier,
) -> Result<&Chunk<SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY>, LinkedChunkError>
where
I: IntoIterator<Item = SyncTimelineEvent>,
I::IntoIter: ExactSizeIterator,
{
self.chunks.replace_gap_at(events, gap_identifier)
}
/// Search for a chunk, and return its identifier.
pub fn chunk_identifier<'a, P>(&'a self, predicate: P) -> Option<ChunkIdentifier>
where
@@ -298,11 +143,18 @@ impl RoomEvents {
self.chunks.rchunks()
}
/// Iterate over the chunks, forward.
///
/// The oldest chunk comes first.
pub fn chunks(&self) -> LinkedChunkIter<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY> {
self.chunks.chunks()
}
/// Iterate over the chunks, starting from `identifier`, backward.
pub fn rchunks_from(
&self,
identifier: ChunkIdentifier,
) -> StdResult<
) -> Result<
LinkedChunkIterBackward<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY>,
LinkedChunkError,
> {
@@ -314,10 +166,8 @@ impl RoomEvents {
pub fn chunks_from(
&self,
identifier: ChunkIdentifier,
) -> StdResult<
LinkedChunkIter<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY>,
LinkedChunkError,
> {
) -> Result<LinkedChunkIter<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY>, LinkedChunkError>
{
self.chunks.chunks_from(identifier)
}
@@ -328,11 +178,18 @@ impl RoomEvents {
self.chunks.ritems()
}
/// Iterate over the events, forward.
///
/// The oldest event comes first.
pub fn events(&self) -> impl Iterator<Item = (Position, &SyncTimelineEvent)> {
self.chunks.items()
}
/// Iterate over the events, starting from `position`, backward.
pub fn revents_from(
&self,
position: Position,
) -> StdResult<impl Iterator<Item = (Position, &SyncTimelineEvent)>, LinkedChunkError> {
) -> Result<impl Iterator<Item = (Position, &SyncTimelineEvent)>, LinkedChunkError> {
self.chunks.ritems_from(position)
}
@@ -341,13 +198,13 @@ impl RoomEvents {
pub fn events_from(
&self,
position: Position,
) -> StdResult<impl Iterator<Item = (Position, &SyncTimelineEvent)>, LinkedChunkError> {
) -> Result<impl Iterator<Item = (Position, &SyncTimelineEvent)>, LinkedChunkError> {
self.chunks.items_from(position)
}
}
impl fmt::Debug for RoomEvents {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
formatter.debug_struct("RoomEvents").field("chunk", &self.chunks).finish()
}
}