feat(sdk): Remove clones of large responses in Sliding Sync.

I admit this patch is quite tricky. Please try to follow me.

So, first off, in `SlidingSyncRoom::new`, we were clearing the timeline,
because somehow it exists twice in memory at this step.

Which led me to understand how `SlidingSync::handle_response` was working.
I've clarified how this part of the code works. We are dealing with 2 kind of
responses for a specific reason: `SyncResponse` and `v4::Response`, now it's
documented and I hope it's clearer.

Then, I notice that we were passing a clone of the entire sliding sync
response (`v4::Response`) to `Client::process_sliding_sync`. I thought it was
suboptimal, so I've updated the code to take a reference. It led me to update
`BaseClient::process_sliding_sync`. It was a little bit tricky, but I reckon we
have less clones now than before.

And now, back to `SlidingSync::handle_response`, I was able to compute the
`timeline` correctly by draining it from the `v4::Response`, or by moving it
from `SyncResponse`. So it's no longer necessary to have this clearing code
inside `SlidingSyncRoom::new`. Honestly it has nothing to do at this place
before.

To conclude: We have cleaner code, and less clones.

What thing I reckon could be optimized, is that the entire `timeline`
(`Vec<TimelineEvent>`) is cloned to be passed to `Client::handle_timeline`. So
this timeline exists in 2 places: in Sliding Sync, and somewhere else. I don't
believe it's a problem now, that's how it works, but we must be aware of that.
This commit is contained in:
Ivan Enderlin
2023-02-23 15:06:09 +01:00
parent d0c8ec7a22
commit c69d28ef79
4 changed files with 59 additions and 31 deletions

View File

@@ -26,7 +26,7 @@ impl BaseClient {
/// * `response` - The response that we received after a successful sliding
/// sync.
#[instrument(skip_all, level = "trace")]
pub async fn process_sliding_sync(&self, response: v4::Response) -> Result<SyncResponse> {
pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result<SyncResponse> {
#[allow(unused_variables)]
let v4::Response {
// FIXME not yet supported by sliding sync. see
@@ -39,6 +39,7 @@ impl BaseClient {
//presence,
..
} = response;
info!(rooms = rooms.len(), lists = lists.len(), extensions = !extensions.is_empty());
if rooms.is_empty() && extensions.is_empty() {
@@ -49,21 +50,31 @@ impl BaseClient {
let v4::Extensions { to_device, e2ee, account_data, .. } = extensions;
let to_device_events = to_device.map(|v4| v4.events).unwrap_or_default();
let to_device_events = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default();
// Destructure the single `None` of the E2EE extension into separate objects
// since that's what the OlmMachine API expects. Passing in the default
// empty maps and vecs for this is completely fine, since the OlmMachine
// since that's what the `OlmMachine` API expects. Passing in the default
// empty maps and vecs for this is completely fine, since the `OlmMachine`
// assumes empty maps/vecs mean no change in the one-time key counts.
// We declare default values that can be referenced hereinbelow. When we try to
// extract values from `e2ee`, that would be unfortunate to clone the
// value just to pass them (to remove them `e2ee`) as a reference later.
let device_one_time_keys_count = Default::default();
let device_unused_fallback_key_types = Default::default();
let (device_lists, device_one_time_keys_count, device_unused_fallback_key_types) = e2ee
.as_ref()
.map(|e2ee| {
(
e2ee.device_lists,
e2ee.device_one_time_keys_count,
e2ee.device_unused_fallback_key_types,
e2ee.device_lists.clone(),
&e2ee.device_one_time_keys_count,
&e2ee.device_unused_fallback_key_types,
)
})
.unwrap_or_default();
.unwrap_or_else(|| {
(Default::default(), &device_one_time_keys_count, &device_unused_fallback_key_types)
});
info!(
to_device_events = to_device_events.len(),
@@ -153,7 +164,7 @@ impl BaseClient {
// }
let room_account_data = if let Some(inner_account_data) = &account_data {
if let Some(events) = inner_account_data.rooms.get(&room_id) {
if let Some(events) = inner_account_data.rooms.get(&*room_id) {
self.handle_room_account_data(&room_id, events, &mut changes).await;
Some(events.to_vec())
} else {
@@ -171,8 +182,8 @@ impl BaseClient {
.handle_timeline(
&room,
room_data.limited,
room_data.timeline,
room_data.prev_batch,
room_data.timeline.clone(),
room_data.prev_batch.clone(),
&push_rules,
&mut user_ids,
&mut room_info,
@@ -246,7 +257,7 @@ impl BaseClient {
debug!("applied changes");
let device_one_time_keys_count =
device_one_time_keys_count.into_iter().map(|(k, v)| (k, v.into())).collect();
device_one_time_keys_count.iter().map(|(k, v)| (k.clone(), (*v).into())).collect();
Ok(SyncResponse {
rooms: new_rooms,
@@ -254,7 +265,7 @@ impl BaseClient {
notifications: changes.notifications,
// FIXME not yet supported by sliding sync.
presence: Default::default(),
account_data: account_data.map(|a| a.global).unwrap_or_default(),
account_data: account_data.as_ref().map(|a| a.global.clone()).unwrap_or_default(),
to_device_events,
device_lists,
device_one_time_keys_count,

View File

@@ -14,11 +14,12 @@ impl Client {
#[instrument(skip(self, response))]
pub(crate) async fn process_sliding_sync(
&self,
response: v4::Response,
response: &v4::Response,
) -> Result<SyncResponse> {
let response = self.base_client().process_sliding_sync(response).await?;
debug!("done processing on base_client");
self.handle_sync_response(&response).await?;
Ok(response)
}
}

View File

@@ -975,28 +975,45 @@ impl SlidingSync {
self.rooms.read().unwrap().values().cloned().collect()
}
/// Handle the HTTP response.
///
/// But which response? `v4::Response`, aka the Sliding Sync response, or
/// `SyncResponse` which still relies on `v3`?
/// Well that's tricky. We have both here, because this Sliding Sync
/// implementation is still experimental, and we didn't want to be too
/// invasive. Thus, `SyncResponse` doesn't support Sliding Sync yet. Hence
/// the fact this method handles both at the same time. It's not super
/// annoying but it was important to clarify that.
#[instrument(skip_all, fields(views = views.len()))]
async fn handle_response(
&self,
resp: v4::Response,
sliding_sync_response: v4::Response,
extensions: Option<ExtensionsConfig>,
views: &mut BTreeMap<String, SlidingSyncViewRequestGenerator>,
) -> Result<UpdateSummary, crate::Error> {
let mut processed = self.client.process_sliding_sync(resp.clone()).await?;
debug!("main client processed.");
Observable::set(&mut self.pos.write().unwrap(), Some(resp.pos));
Observable::set(&mut self.delta_token.write().unwrap(), resp.delta_token);
// We may not need the `sync_response` in the future (once `SyncResponse` will
// move to Sliding Sync, i.e. to `v4::Response`), but processing the
// `sliding_sync_response` is vital, so it must be done somewhere; for now it
// happens here.
let mut sync_response = self.client.process_sliding_sync(&sliding_sync_response).await?;
let update = {
debug!("sliding sync response has been processed");
Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos));
Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token);
let update_summary = {
let mut rooms = Vec::new();
let mut rooms_map = self.rooms.write().unwrap();
for (id, mut room_data) in resp.rooms.into_iter() {
let timeline = if let Some(joined_room) = processed.rooms.join.remove(&id) {
for (id, mut room_data) in sliding_sync_response.rooms.into_iter() {
// `sync_response` contains the rooms with decrypted events if any, so look at
// the timeline events here first if the room exists.
// Otherwise, let's look at the timeline inside the `sliding_sync_response`.
let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&id) {
joined_room.timeline.events
} else {
let events = room_data.timeline.into_iter().map(Into::into).collect();
room_data.timeline = vec![];
events
room_data.timeline.drain(..).map(Into::into).collect()
};
if let Some(mut room) = rooms_map.remove(&id) {
@@ -1018,7 +1035,7 @@ impl SlidingSync {
let mut updated_views = Vec::new();
for (name, updates) in resp.lists {
for (name, updates) in sliding_sync_response.lists {
let Some(generator) = views.get_mut(&name) else {
error!("Response for view {name} - unknown to us. skipping");
continue
@@ -1031,7 +1048,9 @@ impl SlidingSync {
}
// Update the `to-device` next-batch if found.
if let Some(to_device_since) = resp.extensions.to_device.map(|t| t.next_batch) {
if let Some(to_device_since) =
sliding_sync_response.extensions.to_device.map(|t| t.next_batch)
{
self.update_to_device_since(to_device_since)
}
@@ -1046,7 +1065,7 @@ impl SlidingSync {
self.cache_to_storage().await?;
Ok(update)
Ok(update_summary)
}
async fn sync_once(

View File

@@ -44,9 +44,6 @@ impl SlidingSyncRoom {
mut inner: v4::SlidingSyncRoom,
timeline: Vec<SyncTimelineEvent>,
) -> Self {
// we overwrite to only keep one copy
inner.timeline = vec![];
let mut timeline_queue = ObservableVector::new();
timeline_queue.append(timeline.into_iter().collect());