fix: use a little state machine to handle the response

and that allows to make sure that all event handlers are correctly called for all
events contained in the response.
This commit is contained in:
Benjamin Bouvier
2023-07-24 15:41:28 +02:00
parent fb5c96e380
commit 020d5aa292
12 changed files with 88 additions and 50 deletions

View File

@@ -25,7 +25,7 @@ message-ids = ["matrix-sdk-crypto?/message-ids"]
experimental-sliding-sync = ["ruma/unstable-msc3575"]
# helpers for testing features build upon this
testing = ["dep:http", "dep:matrix-sdk-test", "dep:assert_matches"]
testing = ["dep:http", "dep:matrix-sdk-test", "dep:assert_matches", "matrix-sdk-crypto?/testing"]
[dependencies]
assert_matches = { workspace = true, optional = true }

View File

@@ -383,7 +383,7 @@ impl Room {
}
/// Update the last event in the room
#[cfg(feature = "experimental-sliding-sync")]
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
pub(crate) fn set_latest_event(&self, latest_event: Option<SyncTimelineEvent>) {
self.inner.write().unwrap().latest_event = latest_event;
}

View File

@@ -15,17 +15,21 @@
#[cfg(feature = "e2e-encryption")]
use std::ops::Deref;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
use ruma::{
api::client::sync::sync_events::{
v3::{self, InvitedRoom, RoomSummary},
v4::{self, AccountData},
},
events::{AnySyncStateEvent, AnyToDeviceEvent},
serde::Raw,
events::AnySyncStateEvent,
RoomId,
};
use tracing::{debug, info, instrument, warn};
#[cfg(feature = "e2e-encryption")]
use ruma::{events::AnyToDeviceEvent, serde::Raw};
#[cfg(feature = "e2e-encryption")]
use tracing::warn;
use tracing::{debug, info, instrument};
use super::BaseClient;
#[cfg(feature = "e2e-encryption")]
@@ -432,7 +436,7 @@ fn cache_latest_events(room: &mut Room, room_info: &mut RoomInfo, events: &[Sync
}
} else {
warn!(
"Failed to deserialise event as AnySyncTimelineEvent. ID={}",
"Failed to deserialize event as AnySyncTimelineEvent. ID={}",
e.event_id().expect("Event has no ID!")
);
}

View File

@@ -1887,8 +1887,8 @@ impl OlmMachine {
Arc::ptr_eq(&self.inner, &other.inner)
}
#[cfg(any(feature = "testing", test))]
/// Testing purposes only.
#[cfg(any(feature = "testing", test))]
pub fn uploaded_key_count(&self) -> u64 {
self.inner.account.uploaded_key_count()
}

View File

@@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
default = ["e2e-encryption"]
e2e-encryption = ["matrix-sdk-base/e2e-encryption", "dep:matrix-sdk-crypto"]
testing = ["matrix-sdk-crypto?/testing"]
[dependencies]
anyhow = { workspace = true }

View File

@@ -537,7 +537,7 @@ mod test {
// And the room is stored in the client so it can be extracted when needed
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, vec![]).await.unwrap();
client.process_sliding_sync(&response).await.unwrap();
// When we construct a timeline event from it
let timeline_item = EventTimelineItem::from_latest_event(&ss_room, event).await.unwrap();

View File

@@ -122,7 +122,7 @@ mod tests {
let mut room = v4::SlidingSyncRoom::new();
room.timeline.push(event.event);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, vec![]).await.unwrap();
client.process_sliding_sync(&response).await.unwrap();
}
fn message_event(

View File

@@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite", "native-tls"]
testing = ["matrix-sdk-sqlite?/testing"]
testing = ["matrix-sdk-sqlite?/testing", "matrix-sdk-indexeddb?/testing", "matrix-sdk-base/testing"]
e2e-encryption = [
"matrix-sdk-base/e2e-encryption",

View File

@@ -947,8 +947,8 @@ impl Encryption {
}
}
/// Testing purposes only.
#[cfg(any(test, feature = "testing"))]
/// Testing purposees only.
pub async fn uploaded_key_count(&self) -> Result<u64> {
let olm_machine = self.client.olm_machine().await;
let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;

View File

@@ -14,27 +14,14 @@ impl Client {
Ok(SlidingSync::builder(id.into(), self.clone())?)
}
/// Handle all the e2ee information provided in a sliding sync response.
#[cfg(feature = "e2e-encryption")]
pub(crate) async fn process_sliding_sync_e2ee(
&self,
extensions: &v4::Extensions,
) -> Result<Vec<Raw<AnyToDeviceEvent>>> {
Ok(self.base_client().process_sliding_sync_e2ee(extensions).await?)
}
/// Handle all the information provided in a sliding sync response, except
/// for the e2ee bits that are handled by `process_sliding_sync_e2ee`
/// (and which results can be passed as the second argument).
/// for the e2ee bits.
///
/// If you need to handle encryption too, use the internal
/// `SlidingSyncResponseProcessor` instead.
#[instrument(skip(self, response))]
pub async fn process_sliding_sync(
&self,
response: &v4::Response,
to_device_events: Vec<Raw<AnyToDeviceEvent>>,
) -> Result<SyncResponse> {
let mut response = self.base_client().process_sliding_sync(response).await?;
response.to_device.extend(to_device_events);
pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result<SyncResponse> {
let response = self.base_client().process_sliding_sync(response).await?;
debug!("done processing on base_client");
self.handle_sync_response(&response).await?;
@@ -42,3 +29,47 @@ impl Client {
Ok(response)
}
}
/// Small helper to handle a `SlidingSync` response's sub parts.
///
/// This will properly handle the encryption and the room response
/// independently, if needs be, making sure that both are properly processed by
/// event handlers.
#[must_use]
pub(crate) struct SlidingSyncResponseProcessor {
client: Client,
to_device_events: Vec<Raw<AnyToDeviceEvent>>,
response: Option<SyncResponse>,
}
impl SlidingSyncResponseProcessor {
pub fn new(client: Client) -> Self {
Self { client, to_device_events: Vec::new(), response: None }
}
#[cfg(feature = "e2e-encryption")]
pub async fn handle_encryption(&mut self, extensions: &v4::Extensions) -> Result<()> {
// This is an internal API misuse if this is triggered (calling
// handle_room_response before this function), so panic is fine.
assert!(self.response.is_none());
self.to_device_events =
self.client.base_client().process_sliding_sync_e2ee(extensions).await?;
Ok(())
}
pub async fn handle_room_response(&mut self, response: &v4::Response) -> Result<()> {
self.response = Some(self.client.base_client().process_sliding_sync(response).await?);
Ok(())
}
pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
let mut response = self.response.take().unwrap_or_default();
response.to_device.extend(self.to_device_events);
self.client.handle_sync_response(&response).await?;
Ok(response)
}
}

View File

@@ -34,11 +34,9 @@ use std::{
use async_stream::stream;
pub use builder::*;
pub use client::*;
pub use error::*;
use futures_core::stream::Stream;
pub use list::*;
use matrix_sdk_base::sync::SyncResponse;
pub use room::*;
use ruma::{
api::client::{
@@ -60,7 +58,9 @@ use self::{
cache::restore_sliding_sync_state,
sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
};
use crate::{config::RequestConfig, Client, Result};
use crate::{
config::RequestConfig, sliding_sync::client::SlidingSyncResponseProcessor, Client, Result,
};
/// The Sliding Sync instance.
///
@@ -280,14 +280,12 @@ impl SlidingSync {
// `sliding_sync_response` is vital, so it must be done somewhere; for now it
// happens here.
let to_device_events = Vec::new();
let mut response_processor = SlidingSyncResponseProcessor::new(self.inner.client.clone());
#[cfg(feature = "e2e-encryption")]
let to_device_events = if self.is_e2ee_enabled() {
self.inner.client.process_sliding_sync_e2ee(&sliding_sync_response.extensions).await?
} else {
to_device_events
};
if self.is_e2ee_enabled() {
response_processor.handle_encryption(&sliding_sync_response.extensions).await?
}
// Only handle the room's subsection of the response, if this sliding sync was
// configured to do so. That's because even when not requesting it,
@@ -295,16 +293,11 @@ impl SlidingSync {
// unrelated to the current connection's parameters.
//
// NOTE: SS proxy workaround.
let handle_room_response = {
!self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
|| !self.inner.lists.read().await.is_empty()
};
if self.must_process_rooms_response().await {
response_processor.handle_room_response(&sliding_sync_response).await?
}
let mut sync_response = if handle_room_response {
self.inner.client.process_sliding_sync(&sliding_sync_response, to_device_events).await?
} else {
assign!(SyncResponse::default(), { to_device: to_device_events })
};
let mut sync_response = response_processor.process_and_take_response().await?;
debug!(?sync_response, "Sliding Sync response has been handled by the client");
@@ -489,12 +482,20 @@ impl SlidingSync {
))
}
#[cfg(feature = "e2e-encryption")]
/// Is the e2ee extension enabled for this sliding sync instance?
#[cfg(feature = "e2e-encryption")]
fn is_e2ee_enabled(&self) -> bool {
self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
}
/// Should we process the room's subpart of a response?
async fn must_process_rooms_response(&self) -> bool {
// We consider that we must, if there's any room subscription or there's any
// list.
!self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
|| !self.inner.lists.read().await.is_empty()
}
#[instrument(skip_all, fields(pos))]
async fn sync_once(&self) -> Result<UpdateSummary> {
let (request, request_config, requested_room_unsubscriptions) =
@@ -1625,6 +1626,7 @@ mod tests {
}
#[async_test]
#[cfg(feature = "e2e-encryption")]
async fn test_process_only_encryption_events() -> Result<()> {
let room = owned_room_id!("!croissant:example.org");

View File

@@ -347,7 +347,7 @@ fn run_wasm_pack_tests(cmd: Option<WasmFeatureSet>) -> Result<()> {
WasmFeatureSet::MatrixSdkIndexeddbStores,
(
"crates/matrix-sdk",
"--no-default-features --features js,indexeddb,e2e-encryption,rustls-tls --lib",
"--no-default-features --features js,indexeddb,e2e-encryption,rustls-tls,testing --lib",
),
),
(