sliding_sync: Trigger room list update when room info changes

This fixes https://github.com/element-hq/element-x-ios/issues/1847

Signed-off-by: Timo Kösters <timo@koesters.xyz>
This commit is contained in:
Timo Kösters
2024-01-15 12:29:36 +01:00
committed by Benjamin Bouvier
parent d84387d12e
commit d2b02ec2e8
9 changed files with 123 additions and 23 deletions

View File

@@ -213,7 +213,10 @@ impl RoomList {
listener: Box<dyn RoomListEntriesListener>,
) -> RoomListEntriesWithDynamicAdaptersResult {
let (entries_stream, dynamic_entries_controller) =
self.inner.entries_with_dynamic_adapters(page_size.try_into().unwrap());
self.inner.entries_with_dynamic_adapters(
page_size.try_into().unwrap(),
self.room_list_service.inner.client().clone(),
);
RoomListEntriesWithDynamicAdaptersResult {
controller: Arc::new(RoomListDynamicEntriesController::new(

View File

@@ -16,6 +16,7 @@
use std::{
collections::{BTreeMap, BTreeSet},
fmt, iter,
sync::RwLock as StdRwLock,
};
#[cfg(feature = "e2e-encryption")]
use std::{ops::Deref, sync::Arc};
@@ -50,6 +51,7 @@ use ruma::{
serde::Raw,
OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId,
};
use tokio::sync::broadcast;
use tokio::sync::Mutex;
#[cfg(feature = "e2e-encryption")]
use tokio::sync::{RwLock, RwLockReadGuard};
@@ -94,6 +96,7 @@ pub struct BaseClient {
olm_machine: Arc<RwLock<Option<OlmMachine>>>,
/// Observable of when a user is ignored/unignored.
pub(crate) ignore_user_list_changes: SharedObservable<()>,
pub(crate) roominfo_update_sender: Arc<StdRwLock<Option<broadcast::Sender<OwnedRoomId>>>>,
}
#[cfg(not(tarpaulin_include))]
@@ -126,9 +129,16 @@ impl BaseClient {
#[cfg(feature = "e2e-encryption")]
olm_machine: Default::default(),
ignore_user_list_changes: Default::default(),
roominfo_update_sender: Arc::new(StdRwLock::new(None)),
}
}
/// Replaces the sender that triggers updates when room info changes. This
/// should only be used once at initialization.
pub fn set_roominfo_update_sender(&self, sender: broadcast::Sender<OwnedRoomId>) {
*self.roominfo_update_sender.write().unwrap() = Some(sender);
}
/// Clones the current base client to use the same crypto store but a
/// different, in-memory store config, and resets transient state.
pub fn clone_with_in_memory_state_store(&self) -> Self {
@@ -162,7 +172,7 @@ impl BaseClient {
/// Lookup the Room for the given RoomId, or create one, if it didn't exist
/// yet in the store
pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
self.store.get_or_create_room(room_id, room_state)
self.store.get_or_create_room(room_id, room_state, &self)
}
/// Get all the rooms this client knows about.
@@ -195,7 +205,7 @@ impl BaseClient {
/// This method panics if it is called twice.
pub async fn set_session_meta(&self, session_meta: SessionMeta) -> Result<()> {
debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login");
self.store.set_session_meta(session_meta.clone()).await?;
self.store.set_session_meta(session_meta.clone(), &self).await?;
#[cfg(feature = "e2e-encryption")]
self.regenerate_olm().await?;
@@ -726,7 +736,7 @@ impl BaseClient {
///
/// Update the internal and cached state accordingly. Return the final Room.
pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
let room = self.store.get_or_create_room(room_id, RoomState::Joined);
let room = self.store.get_or_create_room(room_id, RoomState::Joined, &self);
if room.state() != RoomState::Joined {
let _sync_lock = self.sync_lock().lock().await;
@@ -747,7 +757,7 @@ impl BaseClient {
///
/// Update the internal and cached state accordingly.
pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
let room = self.store.get_or_create_room(room_id, RoomState::Left);
let room = self.store.get_or_create_room(room_id, RoomState::Left, &self);
if room.state() != RoomState::Left {
let _sync_lock = self.sync_lock().lock().await;
@@ -817,7 +827,7 @@ impl BaseClient {
let mut notifications = Default::default();
for (room_id, new_info) in response.rooms.join {
let room = self.store.get_or_create_room(&room_id, RoomState::Joined);
let room = self.store.get_or_create_room(&room_id, RoomState::Joined, &self);
let mut room_info = room.clone_info();
room_info.mark_as_joined();
@@ -925,7 +935,7 @@ impl BaseClient {
}
for (room_id, new_info) in response.rooms.leave {
let room = self.store.get_or_create_room(&room_id, RoomState::Left);
let room = self.store.get_or_create_room(&room_id, RoomState::Left, &self);
let mut room_info = room.clone_info();
room_info.mark_as_left();
room_info.mark_state_partially_synced();
@@ -979,7 +989,7 @@ impl BaseClient {
}
for (room_id, new_info) in response.rooms.invite {
let room = self.store.get_or_create_room(&room_id, RoomState::Invited);
let room = self.store.get_or_create_room(&room_id, RoomState::Invited, &self);
let mut room_info = room.clone_info();
room_info.mark_as_invited();
room_info.mark_state_fully_synced();

View File

@@ -54,6 +54,7 @@ use ruma::{
RoomId, RoomVersionId, UserId,
};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tracing::{debug, field::debug, info, instrument, trace, warn};
use super::{
@@ -77,6 +78,7 @@ pub struct Room {
room_id: OwnedRoomId,
own_user_id: OwnedUserId,
inner: SharedObservable<RoomInfo>,
roominfo_update_sender: Option<broadcast::Sender<OwnedRoomId>>,
store: Arc<DynStateStore>,
/// The most recent few encrypted events. When the keys come through to
@@ -144,15 +146,17 @@ impl Room {
store: Arc<DynStateStore>,
room_id: &RoomId,
room_state: RoomState,
roominfo_update_sender: Option<broadcast::Sender<OwnedRoomId>>,
) -> Self {
let room_info = RoomInfo::new(room_id, room_state);
Self::restore(own_user_id, store, room_info)
Self::restore(own_user_id, store, room_info, roominfo_update_sender)
}
pub(crate) fn restore(
own_user_id: &UserId,
store: Arc<DynStateStore>,
room_info: RoomInfo,
roominfo_update_sender: Option<broadcast::Sender<OwnedRoomId>>,
) -> Self {
Self {
own_user_id: own_user_id.into(),
@@ -163,6 +167,7 @@ impl Room {
latest_encrypted_events: Arc::new(SyncRwLock::new(RingBuffer::new(
Self::MAX_ENCRYPTED_EVENTS,
))),
roominfo_update_sender,
}
}
@@ -644,10 +649,14 @@ impl Room {
self.inner.get()
}
/// Update the inner summary with the given RoomInfo, and notify
/// subscribers.
/// Update the summary with given RoomInfo. This also triggers an update for
/// the roominfo_update_recv.
pub fn set_room_info(&self, room_info: RoomInfo) {
self.inner.set(room_info);
if let Some(sender) = &self.roominfo_update_sender {
// Ignore error if receiver is down
let _ = sender.send(self.room_id.clone());
}
}
/// Get the `RoomMember` with the given `user_id`.
@@ -1615,7 +1624,7 @@ mod tests {
let user_id = user_id!("@me:example.org");
let room_id = room_id!("!test:localhost");
(store.clone(), Room::new(user_id, store, room_id, room_type))
(store.clone(), Room::new(user_id, store, room_id, room_type, None))
}
fn make_stripped_member_event(user_id: &UserId, name: &str) -> Raw<StrippedRoomMemberEvent> {

View File

@@ -460,7 +460,7 @@ impl BaseClient {
room_id: &RoomId,
) -> (Room, RoomInfo, Option<InvitedRoom>) {
if let Some(invite_state) = &room_data.invite_state {
let room = store.get_or_create_room(room_id, RoomState::Invited);
let room = store.get_or_create_room(room_id, RoomState::Invited, &self);
let mut room_info = room.clone_info();
// We don't actually know what events are inside invite_state. In theory, they
@@ -482,7 +482,7 @@ impl BaseClient {
Some(v3::InvitedRoom::from(v3::InviteState::from(invite_state.clone()))),
)
} else {
let room = store.get_or_create_room(room_id, RoomState::Joined);
let room = store.get_or_create_room(room_id, RoomState::Joined, &self);
let mut room_info = room.clone_info();
// We default to considering this room joined if it's not an invite. If it's
@@ -1596,6 +1596,7 @@ mod tests {
Arc::new(MemoryStore::new()),
room_id!("!r:e.co"),
RoomState::Joined,
None,
)
}

View File

@@ -58,7 +58,7 @@ pub type BoxStream<T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send>>;
use crate::{
rooms::{RoomInfo, RoomState},
MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
BaseClient, MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
};
pub(crate) mod ambiguity_map;
@@ -171,9 +171,18 @@ impl Store {
/// inner `StateStore`.
///
/// This method panics if it is called twice.
pub async fn set_session_meta(&self, session_meta: SessionMeta) -> Result<()> {
pub async fn set_session_meta(
&self,
session_meta: SessionMeta,
client: &BaseClient,
) -> Result<()> {
for info in self.inner.get_room_infos().await? {
let room = Room::restore(&session_meta.user_id, self.inner.clone(), info);
let room = Room::restore(
&session_meta.user_id,
self.inner.clone(),
info,
client.roominfo_update_sender.read().unwrap().clone(),
);
self.rooms.write().unwrap().insert(room.room_id().to_owned(), room);
}
@@ -214,7 +223,12 @@ impl Store {
/// Lookup the Room for the given RoomId, or create one, if it didn't exist
/// yet in the store
pub fn get_or_create_room(&self, room_id: &RoomId, room_type: RoomState) -> Room {
pub fn get_or_create_room(
&self,
room_id: &RoomId,
room_type: RoomState,
client: &BaseClient,
) -> Room {
let user_id =
&self.session_meta.get().expect("Creating room while not being logged in").user_id;
@@ -222,7 +236,15 @@ impl Store {
.write()
.unwrap()
.entry(room_id.to_owned())
.or_insert_with(|| Room::new(user_id, self.inner.clone(), room_id, room_type))
.or_insert_with(|| {
Room::new(
user_id,
self.inner.clone(),
room_id,
room_type,
client.roominfo_update_sender.read().unwrap().clone(),
)
})
.clone()
}
}

View File

@@ -23,8 +23,9 @@ use eyeball_im_util::vector::VectorObserverExt;
use futures_util::{pin_mut, stream, Stream, StreamExt as _};
use matrix_sdk::{
executor::{spawn, JoinHandle},
RoomListEntry, SlidingSync, SlidingSyncList,
Client, RoomListEntry, SlidingSync, SlidingSyncList,
};
use tokio::select;
use super::{filters::Filter, Error, State};
@@ -123,6 +124,7 @@ impl RoomList {
pub fn entries_with_dynamic_adapters(
&self,
page_size: usize,
client: Client,
) -> (impl Stream<Item = Vec<VectorDiff<RoomListEntry>>>, RoomListDynamicEntriesController)
{
let list = self.sliding_sync_list.clone();
@@ -142,8 +144,41 @@ impl RoomList {
let stream = stream! {
loop {
let filter_fn = filter_fn_cell.take().await;
let (values, stream) = list
.room_list_stream()
let (raw_values, mut raw_stream) = list.room_list_stream();
let mut raw_current_values = raw_values.clone();
let client = client.clone();
let raw_stream_with_recv = stream! {
loop {
let mut roominfo_update_recv = client.roominfo_update_recv();
select! {
v = raw_stream.next() => {
if let Some(v) = v {
for change in &v {
change.clone().apply(&mut raw_current_values);
}
yield v;
} else {
break;
}
}
room_id = roominfo_update_recv.recv() => {
if let Ok(room_id) = room_id {
for (index, room) in raw_current_values.iter().enumerate() {
if let RoomListEntry::Filled(r) = room {
if r == &room_id {
let update = VectorDiff::Set { index, value: raw_current_values[index].clone() };
yield vec![update];
break;
}
}
}
}
}
}
}
};
let (values, stream) = (raw_values, raw_stream_with_recv)
.filter(filter_fn)
.dynamic_limit_with_initial_value(page_size, limit_stream.clone());

View File

@@ -1599,7 +1599,8 @@ async fn test_dynamic_entries_stream() -> Result<(), Error> {
let all_rooms = room_list.all_rooms().await?;
let (dynamic_entries_stream, dynamic_entries) = all_rooms.entries_with_dynamic_adapters(5);
let (dynamic_entries_stream, dynamic_entries) =
all_rooms.entries_with_dynamic_adapters(5, client.clone());
pin_mut!(dynamic_entries_stream);
sync_then_assert_request_and_fake_response! {

View File

@@ -212,6 +212,10 @@ pub(crate) struct ClientLocks {
/// outside the `OlmMachine`.
#[cfg(feature = "e2e-encryption")]
pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
/// Ensure that only one service is syncing at a time. Otherwise, they will
/// override each other's updates because they batch changes.
pub(crate) sync_service_lock: Mutex<()>,
}
pub(crate) struct ClientInner {
@@ -241,6 +245,9 @@ pub(crate) struct ClientInner {
/// Notification handlers. See `register_notification_handler`.
notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
/// An update is sent every time the info of a room changes. This is used to
/// trigger updates for the ui.
roominfo_update_recv: broadcast::Receiver<OwnedRoomId>,
/// Whether the client should update its homeserver URL with the discovery
/// information present in the login response.
respect_login_well_known: bool,
@@ -276,6 +283,9 @@ impl ClientInner {
respect_login_well_known: bool,
#[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
) -> Arc<Self> {
let (roominfo_update_sender, roominfo_update_recv) = broadcast::channel(100);
base_client.set_roominfo_update_sender(roominfo_update_sender.clone());
let client = Self {
homeserver: StdRwLock::new(homeserver),
auth_ctx,
@@ -291,6 +301,7 @@ impl ClientInner {
event_handlers: Default::default(),
notification_handlers: Default::default(),
room_update_channels: Default::default(),
roominfo_update_recv,
respect_login_well_known,
sync_beat: event_listener::Event::new(),
#[cfg(feature = "e2e-encryption")]
@@ -844,6 +855,10 @@ impl Client {
}
}
pub fn roominfo_update_recv(&self) -> broadcast::Receiver<OwnedRoomId> {
self.inner.roominfo_update_recv.resubscribe()
}
pub(crate) async fn notification_handlers(
&self,
) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {

View File

@@ -693,7 +693,11 @@ impl SlidingSync {
}
// Handle the response.
// This will batch changes to rooms, so no other service should run at the same
// time
let lock = this.inner.client.locks().sync_service_lock.lock().await;
let updates = this.handle_response(response, &mut position_guard).await?;
drop(lock);
this.cache_to_storage(&position_guard).await?;