feat(sdk): Remove clones of large responses in Sliding Sync

feat(sdk): Remove clones of large responses in Sliding Sync
This commit is contained in:
Ivan Enderlin
2023-02-27 09:47:26 +01:00
committed by GitHub
4 changed files with 74 additions and 44 deletions

View File

@@ -1,12 +1,16 @@
use std::collections::BTreeMap;
#[cfg(feature = "e2e-encryption")]
use std::ops::Deref;
use ruma::api::client::sync::sync_events::{
v3::{self, Ephemeral},
v4,
};
#[cfg(feature = "e2e-encryption")]
use ruma::UserId;
use ruma::{
api::client::sync::sync_events::{
v3::{self, Ephemeral},
v4, DeviceLists,
},
DeviceKeyAlgorithm, UInt,
};
use tracing::{debug, info, instrument};
use super::BaseClient;
@@ -26,7 +30,7 @@ impl BaseClient {
/// * `response` - The response that we received after a successful sliding
/// sync.
#[instrument(skip_all, level = "trace")]
pub async fn process_sliding_sync(&self, response: v4::Response) -> Result<SyncResponse> {
pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result<SyncResponse> {
#[allow(unused_variables)]
let v4::Response {
// FIXME not yet supported by sliding sync. see
@@ -39,6 +43,7 @@ impl BaseClient {
//presence,
..
} = response;
info!(rooms = rooms.len(), lists = lists.len(), extensions = !extensions.is_empty());
if rooms.is_empty() && extensions.is_empty() {
@@ -49,21 +54,35 @@ impl BaseClient {
let v4::Extensions { to_device, e2ee, account_data, .. } = extensions;
let to_device_events = to_device.map(|v4| v4.events).unwrap_or_default();
let to_device_events = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default();
// Destructure the single `None` of the E2EE extension into separate objects
// since that's what the OlmMachine API expects. Passing in the default
// empty maps and vecs for this is completely fine, since the OlmMachine
// since that's what the `OlmMachine` API expects. Passing in the default
// empty maps and vecs for this is completely fine, since the `OlmMachine`
// assumes empty maps/vecs mean no change in the one-time key counts.
// We declare default values that can be referenced hereinbelow. When we try to
// extract values from `e2ee`, that would be unfortunate to clone the
// value just to pass them (to remove them `e2ee`) as a reference later.
let device_one_time_keys_count = BTreeMap::<DeviceKeyAlgorithm, UInt>::default();
let device_unused_fallback_key_types = None;
let (device_lists, device_one_time_keys_count, device_unused_fallback_key_types) = e2ee
.as_ref()
.map(|e2ee| {
(
e2ee.device_lists,
e2ee.device_one_time_keys_count,
e2ee.device_unused_fallback_key_types,
e2ee.device_lists.clone(),
&e2ee.device_one_time_keys_count,
&e2ee.device_unused_fallback_key_types,
)
})
.unwrap_or_default();
.unwrap_or_else(|| {
(
DeviceLists::default(),
&device_one_time_keys_count,
&device_unused_fallback_key_types,
)
});
info!(
to_device_events = to_device_events.len(),
@@ -80,7 +99,7 @@ impl BaseClient {
self.preprocess_to_device_events(
to_device_events,
&device_lists,
&device_one_time_keys_count,
device_one_time_keys_count,
device_unused_fallback_key_types.as_deref(),
)
.await?
@@ -98,14 +117,14 @@ impl BaseClient {
let mut new_rooms = Rooms::default();
for (room_id, room_data) in rooms.into_iter() {
for (room_id, room_data) in rooms {
if !room_data.invite_state.is_empty() {
let invite_states = &room_data.invite_state;
let room = store.get_or_create_stripped_room(&room_id).await;
let room = store.get_or_create_stripped_room(room_id).await;
let mut room_info = room.clone_info();
room_info.mark_state_partially_synced();
if let Some(r) = store.get_room(&room_id) {
if let Some(r) = store.get_room(room_id) {
let mut room_info = r.clone_info();
room_info.mark_as_invited(); // FIXME: this might not be accurate
room_info.mark_state_partially_synced();
@@ -119,7 +138,7 @@ impl BaseClient {
v3::InvitedRoom::from(v3::InviteState::from(invite_states.clone())),
);
} else {
let room = store.get_or_create_room(&room_id, RoomType::Joined).await;
let room = store.get_or_create_room(room_id, RoomType::Joined).await;
let mut room_info = room.clone_info();
room_info.mark_as_joined(); // FIXME: this might not be accurate
room_info.mark_state_partially_synced();
@@ -153,8 +172,8 @@ impl BaseClient {
// }
let room_account_data = if let Some(inner_account_data) = &account_data {
if let Some(events) = inner_account_data.rooms.get(&room_id) {
self.handle_room_account_data(&room_id, events, &mut changes).await;
if let Some(events) = inner_account_data.rooms.get(room_id) {
self.handle_room_account_data(room_id, events, &mut changes).await;
Some(events.to_vec())
} else {
None
@@ -171,8 +190,8 @@ impl BaseClient {
.handle_timeline(
&room,
room_data.limited,
room_data.timeline,
room_data.prev_batch,
room_data.timeline.clone(),
room_data.prev_batch.clone(),
&push_rules,
&mut user_ids,
&mut room_info,
@@ -188,8 +207,8 @@ impl BaseClient {
// The room turned on encryption in this sync, we need
// to also get all the existing users and mark them for
// tracking.
let joined = store.get_joined_user_ids(&room_id).await?;
let invited = store.get_invited_user_ids(&room_id).await?;
let joined = store.get_joined_user_ids(room_id).await?;
let invited = store.get_invited_user_ids(room_id).await?;
let user_ids: Vec<&UserId> =
joined.iter().chain(&invited).map(Deref::deref).collect();
@@ -246,7 +265,7 @@ impl BaseClient {
debug!("applied changes");
let device_one_time_keys_count =
device_one_time_keys_count.into_iter().map(|(k, v)| (k, v.into())).collect();
device_one_time_keys_count.iter().map(|(k, v)| (k.clone(), (*v).into())).collect();
Ok(SyncResponse {
rooms: new_rooms,
@@ -254,7 +273,7 @@ impl BaseClient {
notifications: changes.notifications,
// FIXME not yet supported by sliding sync.
presence: Default::default(),
account_data: account_data.map(|a| a.global).unwrap_or_default(),
account_data: account_data.as_ref().map(|a| a.global.clone()).unwrap_or_default(),
to_device_events,
device_lists,
device_one_time_keys_count,

View File

@@ -14,11 +14,12 @@ impl Client {
#[instrument(skip(self, response))]
pub(crate) async fn process_sliding_sync(
&self,
response: v4::Response,
response: &v4::Response,
) -> Result<SyncResponse> {
let response = self.base_client().process_sliding_sync(response).await?;
debug!("done processing on base_client");
self.handle_sync_response(&response).await?;
Ok(response)
}
}

View File

@@ -975,28 +975,39 @@ impl SlidingSync {
self.rooms.read().unwrap().values().cloned().collect()
}
/// Handle the HTTP response.
#[instrument(skip_all, fields(views = views.len()))]
async fn handle_response(
&self,
resp: v4::Response,
sliding_sync_response: v4::Response,
extensions: Option<ExtensionsConfig>,
views: &mut BTreeMap<String, SlidingSyncViewRequestGenerator>,
) -> Result<UpdateSummary, crate::Error> {
let mut processed = self.client.process_sliding_sync(resp.clone()).await?;
debug!("main client processed.");
Observable::set(&mut self.pos.write().unwrap(), Some(resp.pos));
Observable::set(&mut self.delta_token.write().unwrap(), resp.delta_token);
// Handle and transform a Sliding Sync Response to a `SyncResponse`.
//
// We may not need the `sync_response` in the future (once `SyncResponse` will
// move to Sliding Sync, i.e. to `v4::Response`), but processing the
// `sliding_sync_response` is vital, so it must be done somewhere; for now it
// happens here.
let mut sync_response = self.client.process_sliding_sync(&sliding_sync_response).await?;
let update = {
debug!("sliding sync response has been processed");
Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos));
Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token);
let update_summary = {
let mut rooms = Vec::new();
let mut rooms_map = self.rooms.write().unwrap();
for (id, mut room_data) in resp.rooms.into_iter() {
let timeline = if let Some(joined_room) = processed.rooms.join.remove(&id) {
for (id, mut room_data) in sliding_sync_response.rooms.into_iter() {
// `sync_response` contains the rooms with decrypted events if any, so look at
// the timeline events here first if the room exists.
// Otherwise, let's look at the timeline inside the `sliding_sync_response`.
let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&id) {
joined_room.timeline.events
} else {
let events = room_data.timeline.into_iter().map(Into::into).collect();
room_data.timeline = vec![];
events
room_data.timeline.drain(..).map(Into::into).collect()
};
if let Some(mut room) = rooms_map.remove(&id) {
@@ -1018,7 +1029,7 @@ impl SlidingSync {
let mut updated_views = Vec::new();
for (name, updates) in resp.lists {
for (name, updates) in sliding_sync_response.lists {
let Some(generator) = views.get_mut(&name) else {
error!("Response for view {name} - unknown to us. skipping");
continue
@@ -1031,7 +1042,9 @@ impl SlidingSync {
}
// Update the `to-device` next-batch if found.
if let Some(to_device_since) = resp.extensions.to_device.map(|t| t.next_batch) {
if let Some(to_device_since) =
sliding_sync_response.extensions.to_device.map(|t| t.next_batch)
{
self.update_to_device_since(to_device_since)
}
@@ -1046,7 +1059,7 @@ impl SlidingSync {
self.cache_to_storage().await?;
Ok(update)
Ok(update_summary)
}
async fn sync_once(

View File

@@ -41,12 +41,9 @@ impl SlidingSyncRoom {
pub(super) fn new(
client: Client,
room_id: OwnedRoomId,
mut inner: v4::SlidingSyncRoom,
inner: v4::SlidingSyncRoom,
timeline: Vec<SyncTimelineEvent>,
) -> Self {
// we overwrite to only keep one copy
inner.timeline = vec![];
let mut timeline_queue = ObservableVector::new();
timeline_queue.append(timeline.into_iter().collect());