Refactor process_sliding_sync to extract a method for dealing with rooms

This commit is contained in:
Andy Balaam
2023-06-08 09:42:50 +01:00
parent a85c481368
commit 24ff62befc

View File

@@ -15,9 +15,12 @@
#[cfg(feature = "e2e-encryption")]
use std::ops::Deref;
use ruma::api::client::sync::sync_events::{
v3::{self, RoomSummary},
v4,
use ruma::{
api::client::sync::sync_events::{
v3::{self, InvitedRoom, RoomSummary},
v4::{self, AccountData},
},
RoomId,
};
use tracing::{debug, info, instrument};
@@ -28,8 +31,9 @@ use crate::{
deserialized_responses::AmbiguityChanges,
error::Result,
rooms::RoomState,
store::{ambiguity_map::AmbiguityCache, StateChanges},
store::{ambiguity_map::AmbiguityCache, StateChanges, Store},
sync::{JoinedRoom, Rooms, SyncResponse},
RoomInfo,
};
impl BaseClient {
@@ -97,120 +101,27 @@ impl BaseClient {
self.handle_account_data(&account_data.global, &mut changes).await;
}
let push_rules = self.get_push_rules(&changes).await?;
let mut new_rooms = Rooms::default();
for (room_id, room_data) in rooms {
if let Some(invite_state) = &room_data.invite_state {
let room = store.get_or_create_stripped_room(room_id).await;
let mut room_info = room.clone_info();
room_info.mark_state_partially_synced();
if let Some(r) = store.get_room(room_id) {
let mut room_info = r.clone_info();
room_info.mark_as_invited(); // FIXME: this might not be accurate
room_info.mark_state_partially_synced();
changes.add_room(room_info);
}
self.handle_invited_state(invite_state.as_slice(), &mut room_info, &mut changes);
new_rooms.invite.insert(
room_id.clone(),
v3::InvitedRoom::from(v3::InviteState::from(invite_state.clone())),
);
} else {
let room = store.get_or_create_room(room_id, RoomState::Joined).await;
let mut room_info = room.clone_info();
room_info.mark_as_joined(); // FIXME: this might not be accurate
room_info.mark_state_partially_synced();
if let Some(name) = &room_data.name {
room_info.update_name(name.to_owned());
}
// Sliding sync doesn't have a room summary, nevertheless it contains the joined
// and invited member counts. It likely will never have a heroes concept since
// it calculates the room display name for us.
//
// Let's at least fetch the member counts, since they might be useful.
let mut room_summary = RoomSummary::new();
room_summary.invited_member_count = room_data.invited_count;
room_summary.joined_member_count = room_data.joined_count;
room_info.update_summary(&room_summary);
room_info.set_prev_batch(room_data.prev_batch.as_deref());
let mut user_ids = if !room_data.required_state.is_empty() {
self.handle_state(
&room_data.required_state,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
)
.await?
} else {
Default::default()
};
let room_account_data = if let Some(events) = account_data.rooms.get(room_id) {
self.handle_room_account_data(room_id, events, &mut changes).await;
Some(events.to_vec())
} else {
None
};
if room_data.limited {
room_info.mark_members_missing();
}
let timeline = self
.handle_timeline(
&room,
room_data.limited,
room_data.timeline.clone(),
room_data.prev_batch.clone(),
&push_rules,
&mut user_ids,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
)
.await?;
#[cfg(feature = "e2e-encryption")]
if room_info.is_encrypted() {
if let Some(o) = self.olm_machine() {
if !room.is_encrypted() {
// The room turned on encryption in this sync, we need
// to also get all the existing users and mark them for
// tracking.
let user_ids =
store.get_user_ids(room_id, RoomMemberships::ACTIVE).await?;
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
}
if !user_ids.is_empty() {
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?;
}
}
}
let notification_count = room_data.unread_notifications.clone().into();
room_info.update_notification_count(notification_count);
new_rooms.join.insert(
room_id.clone(),
JoinedRoom::new(
timeline,
room_data.required_state.clone(),
room_account_data.unwrap_or_default(),
Vec::new(),
notification_count,
),
);
changes.add_room(room_info);
let (room_to_add, joined_room, invited_room) = self
.process_sliding_sync_room(
room_id,
room_data,
&store,
&mut changes,
&mut ambiguity_cache,
account_data,
)
.await?;
if let Some(room_to_add) = room_to_add {
changes.add_room(room_to_add);
}
if let Some(joined_room) = joined_room {
new_rooms.join.insert(room_id.clone(), joined_room);
}
if let Some(invited_room) = invited_room {
new_rooms.invite.insert(room_id.clone(), invited_room);
}
}
@@ -272,6 +183,126 @@ impl BaseClient {
device_one_time_keys_count,
})
}
async fn process_sliding_sync_room(
&self,
room_id: &RoomId,
room_data: &v4::SlidingSyncRoom,
store: &Store,
changes: &mut StateChanges,
ambiguity_cache: &mut AmbiguityCache,
account_data: &AccountData,
) -> Result<(Option<RoomInfo>, Option<JoinedRoom>, Option<InvitedRoom>)> {
if let Some(invite_state) = &room_data.invite_state {
let room = store.get_or_create_stripped_room(room_id).await;
let mut room_info = room.clone_info();
room_info.mark_state_partially_synced();
let room_to_add = if let Some(r) = store.get_room(room_id) {
let mut room_info = r.clone_info();
room_info.mark_as_invited(); // FIXME: this might not be accurate
room_info.mark_state_partially_synced();
Some(room_info)
} else {
None
};
self.handle_invited_state(invite_state.as_slice(), &mut room_info, changes);
let invited_room = v3::InvitedRoom::from(v3::InviteState::from(invite_state.clone()));
Ok((room_to_add, None, Some(invited_room)))
} else {
let room = store.get_or_create_room(room_id, RoomState::Joined).await;
let mut room_info = room.clone_info();
room_info.mark_as_joined(); // FIXME: this might not be accurate
room_info.mark_state_partially_synced();
if let Some(name) = &room_data.name {
room_info.update_name(name.to_owned());
}
// Sliding sync doesn't have a room summary, nevertheless it contains the joined
// and invited member counts. It likely will never have a heroes concept since
// it calculates the room display name for us.
//
// Let's at least fetch the member counts, since they might be useful.
let mut room_summary = RoomSummary::new();
room_summary.invited_member_count = room_data.invited_count;
room_summary.joined_member_count = room_data.joined_count;
room_info.update_summary(&room_summary);
room_info.set_prev_batch(room_data.prev_batch.as_deref());
let mut user_ids = if !room_data.required_state.is_empty() {
self.handle_state(
&room_data.required_state,
&mut room_info,
changes,
ambiguity_cache,
)
.await?
} else {
Default::default()
};
let room_account_data = if let Some(events) = account_data.rooms.get(room_id) {
self.handle_room_account_data(room_id, events, changes).await;
Some(events.to_vec())
} else {
None
};
if room_data.limited {
room_info.mark_members_missing();
}
let push_rules = self.get_push_rules(changes).await?;
let timeline = self
.handle_timeline(
&room,
room_data.limited,
room_data.timeline.clone(),
room_data.prev_batch.clone(),
&push_rules,
&mut user_ids,
&mut room_info,
changes,
ambiguity_cache,
)
.await?;
#[cfg(feature = "e2e-encryption")]
if room_info.is_encrypted() {
if let Some(o) = self.olm_machine() {
if !room.is_encrypted() {
// The room turned on encryption in this sync, we need
// to also get all the existing users and mark them for
// tracking.
let user_ids = store.get_user_ids(room_id, RoomMemberships::ACTIVE).await?;
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
}
if !user_ids.is_empty() {
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?;
}
}
}
let notification_count = room_data.unread_notifications.clone().into();
room_info.update_notification_count(notification_count);
let joined_room = JoinedRoom::new(
timeline,
room_data.required_state.clone(),
room_account_data.unwrap_or_default(),
Vec::new(),
notification_count,
);
Ok((Some(room_info), Some(joined_room), None))
}
}
}
#[cfg(test)]