feat(sliding sync): only process encryption events (resp. room events) if configured as so

This commit is contained in:
Benjamin Bouvier
2023-07-20 15:03:31 +02:00
parent da7d1b092e
commit 8a809dba04
8 changed files with 117 additions and 79 deletions

View File

@@ -916,12 +916,6 @@ impl BaseClient {
presence: response.presence.events,
account_data: response.account_data.events,
to_device,
device_lists: response.device_lists,
device_one_time_keys_count: response
.device_one_time_keys_count
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
ambiguity_changes: AmbiguityChanges { changes: ambiguity_cache.changes },
notifications: changes.notifications,
};

View File

@@ -21,7 +21,8 @@ use ruma::{
v3::{self, InvitedRoom, RoomSummary},
v4::{self, AccountData},
},
events::AnySyncStateEvent,
events::{AnySyncStateEvent, AnyToDeviceEvent},
serde::Raw,
RoomId,
};
use tracing::{debug, info, instrument, warn};
@@ -41,6 +42,57 @@ use crate::{
};
impl BaseClient {
#[cfg(feature = "e2e-encryption")]
/// Processes the E2EE-related events from the Sliding Sync response.
///
/// In addition to writes to the crypto store, this may also write into the
/// state store, in particular it may write latest-events to the state
/// store.
pub async fn process_sliding_sync_e2ee(
&self,
extensions: &v4::Extensions,
) -> Result<Vec<Raw<AnyToDeviceEvent>>> {
if extensions.is_empty() {
return Ok(Default::default());
}
let v4::Extensions { to_device, e2ee, .. } = extensions;
let to_device = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default();
info!(
to_device_events = to_device.len(),
device_one_time_keys_count = e2ee.device_one_time_keys_count.len(),
device_unused_fallback_key_types =
e2ee.device_unused_fallback_key_types.as_ref().map(|v| v.len())
);
let mut changes = StateChanges::default();
// Process the to-device events and other related e2ee data. This returns a list
// of all the to-device events that were passed in but encrypted ones
// were replaced with their decrypted version.
// Passing in the default empty maps and vecs for this is completely fine, since
// the `OlmMachine` assumes empty maps/vecs mean no change in the one-time key
// counts.
let to_device = self
.preprocess_to_device_events(
to_device,
&e2ee.device_lists,
&e2ee.device_one_time_keys_count,
e2ee.device_unused_fallback_key_types.as_deref(),
&mut changes,
)
.await?;
debug!("ready to submit changes to store");
self.store.save_changes(&changes).await?;
self.apply_changes(&changes).await;
debug!("applied changes");
Ok(to_device)
}
/// Process a response from a sliding sync call.
///
/// # Arguments
@@ -49,7 +101,6 @@ impl BaseClient {
/// sync.
#[instrument(skip_all, level = "trace")]
pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result<SyncResponse> {
#[allow(unused_variables)]
let v4::Response {
// FIXME not yet supported by sliding sync. see
// https://github.com/matrix-org/matrix-rust-sdk/issues/1014
@@ -70,36 +121,10 @@ impl BaseClient {
return Ok(SyncResponse::default());
};
let v4::Extensions { to_device, e2ee, account_data, receipts, .. } = extensions;
let to_device = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default();
info!(
to_device_events = to_device.len(),
device_one_time_keys_count = e2ee.device_one_time_keys_count.len(),
device_unused_fallback_key_types =
e2ee.device_unused_fallback_key_types.as_ref().map(|v| v.len())
);
let v4::Extensions { account_data, receipts, .. } = extensions;
let mut changes = StateChanges::default();
// Process the to-device events and other related e2ee data. This returns a list
// of all the to-device events that were passed in but encrypted ones
// were replaced with their decrypted version.
// Passing in the default empty maps and vecs for this is completely fine, since
// the `OlmMachine` assumes empty maps/vecs mean no change in the one-time key
// counts.
#[cfg(feature = "e2e-encryption")]
let to_device = self
.preprocess_to_device_events(
to_device,
&e2ee.device_lists,
&e2ee.device_one_time_keys_count,
e2ee.device_unused_fallback_key_types.as_deref(),
&mut changes,
)
.await?;
let store = self.store.clone();
let mut ambiguity_cache = AmbiguityCache::new(store.inner.clone());
@@ -172,9 +197,6 @@ impl BaseClient {
self.apply_changes(&changes).await;
debug!("applied changes");
let device_one_time_keys_count =
e2ee.device_one_time_keys_count.iter().map(|(k, v)| (k.clone(), (*v).into())).collect();
Ok(SyncResponse {
rooms: new_rooms,
ambiguity_changes: AmbiguityChanges { changes: ambiguity_cache.changes },
@@ -182,9 +204,7 @@ impl BaseClient {
// FIXME not yet supported by sliding sync.
presence: Default::default(),
account_data: account_data.global.clone(),
to_device,
device_lists: e2ee.device_lists.clone(),
device_one_time_keys_count,
to_device: Default::default(),
})
}

View File

@@ -21,7 +21,7 @@ use ruma::{
api::client::{
push::get_notifications::v3::Notification,
sync::sync_events::{
v3::InvitedRoom, DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount,
v3::InvitedRoom, UnreadNotificationsCount as RumaUnreadNotificationsCount,
},
},
events::{
@@ -29,7 +29,7 @@ use ruma::{
AnySyncEphemeralRoomEvent, AnySyncStateEvent, AnyToDeviceEvent,
},
serde::Raw,
DeviceKeyAlgorithm, OwnedRoomId,
OwnedRoomId,
};
use serde::{Deserialize, Serialize};
@@ -54,13 +54,6 @@ pub struct SyncResponse {
pub account_data: Vec<Raw<AnyGlobalAccountDataEvent>>,
/// Messages sent directly between devices.
pub to_device: Vec<Raw<AnyToDeviceEvent>>,
/// Information on E2E device updates.
///
/// Only present on an incremental sync.
pub device_lists: DeviceLists,
/// For each key algorithm, the number of unclaimed one-time keys
/// currently held on the server for a device.
pub device_one_time_keys_count: BTreeMap<DeviceKeyAlgorithm, u64>,
/// Collection of ambiguity changes that room member events trigger.
pub ambiguity_changes: AmbiguityChanges,
/// New notifications per room.
@@ -74,8 +67,6 @@ impl fmt::Debug for SyncResponse {
.field("rooms", &self.rooms)
.field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
.field("to_device", &DebugListOfRawEventsNoId(&self.to_device))
.field("device_lists", &self.device_lists)
.field("device_one_time_keys_count", &self.device_one_time_keys_count)
.field("ambiguity_changes", &self.ambiguity_changes)
.field("notifications", &DebugNotificationMap(&self.notifications))
.finish_non_exhaustive()

View File

@@ -537,7 +537,7 @@ mod test {
// And the room is stored in the client so it can be extracted when needed
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response).await.unwrap();
client.process_sliding_sync(&response, vec![]).await.unwrap();
// When we construct a timeline event from it
let timeline_item = EventTimelineItem::from_latest_event(&ss_room, event).await.unwrap();

View File

@@ -122,7 +122,7 @@ mod tests {
let mut room = v4::SlidingSyncRoom::new();
room.timeline.push(event.event);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response).await.unwrap();
client.process_sliding_sync(&response, vec![]).await.unwrap();
}
fn message_event(

View File

@@ -1,5 +1,5 @@
use matrix_sdk_base::sync::SyncResponse;
use ruma::api::client::sync::sync_events::v4;
use ruma::{api::client::sync::sync_events::v4, events::AnyToDeviceEvent, serde::Raw};
use tracing::{debug, instrument};
use super::{SlidingSync, SlidingSyncBuilder};
@@ -14,10 +14,28 @@ impl Client {
Ok(SlidingSync::builder(id.into(), self.clone())?)
}
/// Handle all the information provided in a sliding sync response
/// Handle all the e2ee information provided in a sliding sync response.
#[cfg(feature = "e2e-encryption")]
pub(crate) async fn process_sliding_sync_e2ee(
&self,
extensions: &v4::Extensions,
) -> Result<Vec<Raw<AnyToDeviceEvent>>> {
Ok(self.base_client().process_sliding_sync_e2ee(extensions).await?)
}
/// Handle all the information provided in a sliding sync response, except
/// for the e2ee bits that are handled by `process_sliding_sync_e2ee`
/// (and which results can be passed as the second argument).
#[instrument(skip(self, response))]
pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result<SyncResponse> {
let response = self.base_client().process_sliding_sync(response).await?;
pub async fn process_sliding_sync(
&self,
response: &v4::Response,
to_device_events: Vec<Raw<AnyToDeviceEvent>>,
) -> Result<SyncResponse> {
let mut response = self.base_client().process_sliding_sync(response).await?;
response.to_device.extend(to_device_events);
debug!("done processing on base_client");
self.handle_sync_response(&response).await?;

View File

@@ -38,6 +38,7 @@ pub use client::*;
pub use error::*;
use futures_core::stream::Stream;
pub use list::*;
use matrix_sdk_base::sync::SyncResponse;
pub use room::*;
use ruma::{
api::client::{
@@ -278,8 +279,30 @@ impl SlidingSync {
// move to Sliding Sync, i.e. to `v4::Response`), but processing the
// `sliding_sync_response` is vital, so it must be done somewhere; for now it
// happens here.
let mut sync_response =
self.inner.client.process_sliding_sync(&sliding_sync_response).await?;
let to_device_events = Vec::new();
#[cfg(feature = "e2e-encryption")]
let to_device_events = if self.is_e2ee_enabled() {
self.inner.client.process_sliding_sync_e2ee(&sliding_sync_response.extensions).await?
} else {
to_device_events
};
// Only handle the room's subsection of the response, if this sliding sync was configured
// to do so. That's because even when not requesting it, sometimes the current (2023-07-20)
// proxy will forward room events unrelated to the current connection's parameters.
// NOTE: SS proxy workaround.
let handle_room_response = {
!self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
|| !self.inner.lists.read().await.is_empty()
};
let mut sync_response = if handle_room_response {
self.inner.client.process_sliding_sync(&sliding_sync_response, to_device_events).await?
} else {
assign!(SyncResponse::default(), { to_device: to_device_events })
};
debug!(?sync_response, "Sliding Sync response has been handled by the client");
@@ -464,6 +487,12 @@ impl SlidingSync {
))
}
#[cfg(feature = "e2e-encryption")]
/// Is the e2ee extension enabled for this sliding sync instance?
fn is_e2ee_enabled(&self) -> bool {
self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
}
#[instrument(skip_all, fields(pos))]
async fn sync_once(&self) -> Result<UpdateSummary> {
let (request, request_config, requested_room_unsubscriptions) =
@@ -486,7 +515,7 @@ impl SlidingSync {
#[cfg(feature = "e2e-encryption")]
let response = {
if self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true) {
if self.is_e2ee_enabled() {
debug!("Sliding Sync is sending the request along with outgoing E2EE requests");
// Here, we need to run 2 things:
@@ -570,7 +599,7 @@ impl SlidingSync {
// unsubscriptions buffer. However, it would be an error to empty it entirely as
// more unsubscriptions could have been inserted during the request/response
// dance. So let's cherry-pick which unsubscriptions to remove.
{
if !requested_room_unsubscriptions.is_empty() {
let room_unsubscriptions = &mut *this.inner.room_unsubscriptions.write().unwrap();
room_unsubscriptions
@@ -828,6 +857,7 @@ impl StickyData for SlidingSyncStickyParameters {
/// correctly, so we cheat and "correct" it using heuristics here.
/// TODO remove this workaround as soon as support of the `limited` flag is
/// properly implemented in the open-source proxy: https://github.com/matrix-org/sliding-sync/issues/197
// NOTE: SS proxy workaround.
fn compute_limited(
known_rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>,
response_rooms: &mut BTreeMap<OwnedRoomId, v4::SlidingSyncRoom>,

View File

@@ -31,11 +31,11 @@ use matrix_sdk_base::{
use ruma::{
api::client::{
push::get_notifications::v3::Notification,
sync::sync_events::{self, v3::InvitedRoom, DeviceLists},
sync::sync_events::{self, v3::InvitedRoom},
},
events::{presence::PresenceEvent, AnyGlobalAccountDataEvent, AnyToDeviceEvent},
serde::Raw,
DeviceKeyAlgorithm, OwnedRoomId, RoomId,
OwnedRoomId, RoomId,
};
use tracing::{debug, error, warn};
@@ -55,13 +55,6 @@ pub struct SyncResponse {
pub account_data: Vec<Raw<AnyGlobalAccountDataEvent>>,
/// Messages sent directly between devices.
pub to_device: Vec<Raw<AnyToDeviceEvent>>,
/// Information on E2E device updates.
///
/// Only present on an incremental sync.
pub device_lists: DeviceLists,
/// For each key algorithm, the number of unclaimed one-time keys
/// currently held on the server for a device.
pub device_one_time_keys_count: BTreeMap<DeviceKeyAlgorithm, u64>,
/// Collection of ambiguity changes that room member events trigger.
pub ambiguity_changes: AmbiguityChanges,
/// New notifications per room.
@@ -75,8 +68,6 @@ impl SyncResponse {
presence,
account_data,
to_device,
device_lists,
device_one_time_keys_count,
ambiguity_changes,
notifications,
} = base_response;
@@ -87,8 +78,6 @@ impl SyncResponse {
presence,
account_data,
to_device,
device_lists,
device_one_time_keys_count,
ambiguity_changes,
notifications,
}
@@ -102,8 +91,6 @@ impl fmt::Debug for SyncResponse {
.field("rooms", &self.rooms)
.field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
.field("to_device", &DebugListOfRawEventsNoId(&self.to_device))
.field("device_lists", &self.device_lists)
.field("device_one_time_keys_count", &self.device_one_time_keys_count)
.field("ambiguity_changes", &self.ambiguity_changes)
.field("notifications", &DebugNotificationMap(&self.notifications))
.finish_non_exhaustive()
@@ -173,8 +160,6 @@ impl Client {
presence,
account_data,
to_device,
device_lists: _,
device_one_time_keys_count: _,
ambiguity_changes: _,
notifications,
} = response;