From 2acc13ed2cdb465bb8276f97ee49c5ebf9d7a485 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 10 Jul 2023 18:06:13 +0200 Subject: [PATCH] feat: add a new `NotificationClient` API :mailbox_with_mail: (#2235) * chore: rename EncryptionSyncMode variants * feat: split the encryption sync modes into two different functions * feat: make locking optional in the `EncryptionSync` * feat: experimental notification client that retries decryption if it failed the first time * fix: don't iloop retrying decryption * chore: helper to convert from bool to `WithLocking` * feat: don't loop and just retry decryption of the notification event linearly * feat: remove unused set_notification_delegate Dead code is dead. * ffi: get rid of `get_notification_item` and introduce the `NotificationClient` * fmt * feat: don't swallow encryption sync errors when retrying notification event decryption * keeping a tidy commit history is NP-hard * will i ever learn * chore: enable experimental-notification-client in the FFI crate * test: add basic integration test for the common path * Address first batch of review comments, thanks Jonas! --- bindings/matrix-sdk-ffi/Cargo.toml | 2 +- bindings/matrix-sdk-ffi/src/client.rs | 61 +--- .../matrix-sdk-ffi/src/encryption_sync.rs | 52 +--- bindings/matrix-sdk-ffi/src/error.rs | 8 +- bindings/matrix-sdk-ffi/src/notification.rs | 152 +++++----- crates/matrix-sdk-ui/Cargo.toml | 5 +- .../matrix-sdk-ui/src/encryption_sync/mod.rs | 189 +++++++----- crates/matrix-sdk-ui/src/lib.rs | 2 + .../matrix-sdk-ui/src/notification_client.rs | 270 ++++++++++++++++++ .../tests/integration/encryption_sync.rs | 150 ++++++---- .../matrix-sdk-ui/tests/integration/main.rs | 2 + .../tests/integration/notification_client.rs | 94 ++++++ 12 files changed, 693 insertions(+), 294 deletions(-) create mode 100644 crates/matrix-sdk-ui/src/notification_client.rs create mode 100644 crates/matrix-sdk-ui/tests/integration/notification_client.rs diff --git a/bindings/matrix-sdk-ffi/Cargo.toml b/bindings/matrix-sdk-ffi/Cargo.toml index af1a57ce8..8ed330a93 100644 --- a/bindings/matrix-sdk-ffi/Cargo.toml +++ b/bindings/matrix-sdk-ffi/Cargo.toml @@ -29,7 +29,7 @@ eyeball-im = { workspace = true } extension-trait = "1.0.1" futures-core = { workspace = true } futures-util = { workspace = true } -matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui", default-features = false, features = ["e2e-encryption", "experimental-room-list", "experimental-encryption-sync"] } +matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui", default-features = false, features = ["e2e-encryption", "experimental-room-list", "experimental-encryption-sync", "experimental-notification-client"] } mime = "0.3.16" # FIXME: we currently can't feature flag anything in the api.udl, therefore we must enforce experimental-sliding-sync being exposed here.. # see https://github.com/matrix-org/matrix-rust-sdk/issues/1014 diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 7c1a482bc..6fb333abc 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -24,10 +24,7 @@ use matrix_sdk::{ }, Client as MatrixClient, }; -use ruma::{ - push::{HttpPusherData as RumaHttpPusherData, PushFormat as RumaPushFormat}, - RoomId, -}; +use ruma::push::{HttpPusherData as RumaHttpPusherData, PushFormat as RumaPushFormat}; use serde_json::Value; use tokio::sync::broadcast::error::RecvError; use tracing::{debug, error}; @@ -35,7 +32,7 @@ use url::Url; use super::{room::Room, session_verification::SessionVerificationController, RUNTIME}; use crate::{ - client, notification::NotificationItem, notification_settings::NotificationSettings, + client, notification::NotificationClientBuilder, notification_settings::NotificationSettings, ClientError, }; @@ -104,11 +101,6 @@ pub trait ClientDelegate: Sync + Send { fn did_receive_auth_error(&self, is_soft_logout: bool); } -#[uniffi::export(callback_interface)] -pub trait NotificationDelegate: Sync + Send { - fn did_receive_notification(&self, notification: NotificationItem); -} - #[uniffi::export(callback_interface)] pub trait ProgressWatcher: Send + Sync { fn transmission_progress(&self, progress: TransmissionProgress); @@ -133,7 +125,6 @@ impl From for TransmissionProgress { pub struct Client { pub(crate) inner: MatrixClient, delegate: Arc>>>, - notification_delegate: Arc>>>, session_verification_controller: Arc>>, } @@ -159,7 +150,6 @@ impl Client { let client = Client { inner: sdk_client, delegate: Arc::new(RwLock::new(None)), - notification_delegate: Arc::new(RwLock::new(None)), session_verification_controller, }; @@ -594,51 +584,8 @@ impl Client { }) } - /// Sets a notification delegate and a handler. - /// - /// The sliding sync requires to have registered m.room.member with value - /// $ME and m.room.power_levels to be able to intercept the events. - /// This function blocks execution and should be dispatched concurrently. - pub fn set_notification_delegate( - &self, - notification_delegate: Option>, - ) { - *self.notification_delegate.write().unwrap() = notification_delegate; - let notification_delegate = Arc::clone(&self.notification_delegate); - let handler = move |notification, room: SdkRoom, _| { - let notification_delegate = Arc::clone(¬ification_delegate); - async move { - if let Ok(Some(notification_item)) = - NotificationItem::new_from_notification(notification, room).await - { - if let Some(notification_delegate) = - notification_delegate.read().unwrap().as_ref() - { - notification_delegate.did_receive_notification(notification_item); - } - } - } - }; - RUNTIME.block_on(async move { - self.inner.register_notification_handler(handler).await; - }) - } - - pub fn get_notification_item( - &self, - room_id: String, - event_id: String, - filter_by_push_rules: bool, - ) -> Result, ClientError> { - RUNTIME.block_on(async move { - // We may also need to do a sync here since this may fail if the keys are not - // valid anymore - let room_id = RoomId::parse(room_id)?; - let room = self.inner.get_room(&room_id).context("Room not found")?; - let notification = - NotificationItem::new_from_event_id(&event_id, room, filter_by_push_rules).await?; - Ok(notification) - }) + pub fn notification_client(&self) -> Arc { + NotificationClientBuilder::new(self.inner.clone()) } pub fn get_notification_settings(&self) -> Arc { diff --git a/bindings/matrix-sdk-ffi/src/encryption_sync.rs b/bindings/matrix-sdk-ffi/src/encryption_sync.rs index 2954d42c2..7659b10ba 100644 --- a/bindings/matrix-sdk-ffi/src/encryption_sync.rs +++ b/bindings/matrix-sdk-ffi/src/encryption_sync.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use futures_util::{pin_mut, StreamExt as _}; -use matrix_sdk_ui::encryption_sync::{EncryptionSync as MatrixEncryptionSync, EncryptionSyncMode}; +use matrix_sdk_ui::encryption_sync::EncryptionSync as MatrixEncryptionSync; use tracing::{error, warn}; use crate::{client::Client, error::ClientError, task_handle::TaskHandle, RUNTIME}; @@ -75,23 +75,6 @@ impl EncryptionSync { } } -impl Client { - fn encryption_sync( - &self, - id: String, - listener: Box, - mode: EncryptionSyncMode, - ) -> Result, ClientError> { - RUNTIME.block_on(async move { - let inner = Arc::new(MatrixEncryptionSync::new(id, self.inner.clone(), mode).await?); - - let handle = EncryptionSync::start(inner.clone(), listener); - - Ok(Arc::new(EncryptionSync { _handle: handle, sync: inner })) - }) - } -} - #[uniffi::export] impl Client { /// Must be called to get the encryption loop running. @@ -110,25 +93,20 @@ impl Client { id: String, listener: Box, ) -> Result, ClientError> { - self.encryption_sync(id, listener, EncryptionSyncMode::NeverStop) - } + RUNTIME.block_on(async move { + let inner = Arc::new( + MatrixEncryptionSync::new( + id, + self.inner.clone(), + None, + matrix_sdk_ui::encryption_sync::WithLocking::Yes, + ) + .await?, + ); - /// Encryption loop for a notification process. - /// - /// A fixed number of iterations can be given, to limit the time spent in - /// that loop. - /// - /// This should be avoided, whenever possible, and be used only in - /// situations where the encryption sync loop needs to run from multiple - /// processes at the same time (on iOS for instance, where notifications - /// are handled in a separate process). If you aren't in such a - /// situation, prefer using `Client::room_list(true)`. - pub fn notification_encryption_sync( - &self, - id: String, - listener: Box, - num_iters: u8, - ) -> Result, ClientError> { - self.encryption_sync(id, listener, EncryptionSyncMode::RunFixedIterations(num_iters)) + let handle = EncryptionSync::start(inner.clone(), listener); + + Ok(Arc::new(EncryptionSync { _handle: handle, sync: inner })) + }) } } diff --git a/bindings/matrix-sdk-ffi/src/error.rs b/bindings/matrix-sdk-ffi/src/error.rs index 9e6955760..52ee4ce93 100644 --- a/bindings/matrix-sdk-ffi/src/error.rs +++ b/bindings/matrix-sdk-ffi/src/error.rs @@ -4,7 +4,7 @@ use matrix_sdk::{ self, encryption::CryptoStoreError, HttpError, IdParseError, NotificationSettingsError as SdkNotificationSettingsError, StoreError, }; -use matrix_sdk_ui::{encryption_sync, timeline}; +use matrix_sdk_ui::{encryption_sync, notification_client, timeline}; #[derive(Debug, thiserror::Error)] pub enum ClientError { @@ -84,6 +84,12 @@ impl From for ClientError { } } +impl From for ClientError { + fn from(e: notification_client::Error) -> Self { + Self::new(e) + } +} + #[derive(Debug, thiserror::Error, uniffi::Error)] #[uniffi(flat_error)] pub enum RoomError { diff --git a/bindings/matrix-sdk-ffi/src/notification.rs b/bindings/matrix-sdk-ffi/src/notification.rs index 1ff7afb9e..7f1a38d7e 100644 --- a/bindings/matrix-sdk-ffi/src/notification.rs +++ b/bindings/matrix-sdk-ffi/src/notification.rs @@ -1,12 +1,12 @@ use std::sync::Arc; -use matrix_sdk::room::Room; -use ruma::{ - api::client::push::get_notifications::v3::Notification, events::AnySyncTimelineEvent, - push::Action, EventId, +use matrix_sdk_ui::notification_client::{ + NotificationClient as MatrixNotificationClient, + NotificationClientBuilder as MatrixNotificationClientBuilder, }; +use ruma::{EventId, RoomId}; -use crate::event::TimelineEvent; +use crate::{error::ClientError, event::TimelineEvent, helpers::unwrap_or_clone_arc, RUNTIME}; #[derive(uniffi::Record)] pub struct NotificationSenderInfo { @@ -16,7 +16,6 @@ pub struct NotificationSenderInfo { #[derive(uniffi::Record)] pub struct NotificationRoomInfo { - pub id: String, pub display_name: String, pub avatar_url: Option, pub canonical_alias: Option, @@ -35,70 +34,83 @@ pub struct NotificationItem { pub is_noisy: bool, } -impl NotificationItem { - pub(crate) async fn new_from_notification( - notification: Notification, - room: Room, - ) -> anyhow::Result> { - if !notification.actions.iter().any(|a| a.should_notify()) { - return Ok(None); - } +#[derive(Clone, uniffi::Object)] +pub struct NotificationClientBuilder { + builder: MatrixNotificationClientBuilder, +} - let deserialized_event = notification.event.deserialize()?; - - Self::new(deserialized_event, room, notification.actions).await.map(Some) - } - - pub(crate) async fn new_from_event_id( - event_id: &str, - room: Room, - filter_by_push_rules: bool, - ) -> anyhow::Result> { - let event_id = EventId::parse(event_id)?; - let ruma_event = room.event(&event_id).await?; - - if filter_by_push_rules && !ruma_event.push_actions.iter().any(|a| a.should_notify()) { - return Ok(None); - } - - let event: AnySyncTimelineEvent = ruma_event.event.deserialize()?.into(); - Self::new(event, room, ruma_event.push_actions).await.map(Some) - } - - async fn new( - event: AnySyncTimelineEvent, - room: Room, - actions: Vec, - ) -> anyhow::Result { - let sender = match &room { - Room::Invited(invited) => invited.invite_details().await?.inviter, - _ => room.get_member(event.sender()).await?, - }; - let mut sender_display_name = None; - let mut sender_avatar_url = None; - if let Some(sender) = sender { - sender_display_name = sender.display_name().map(|s| s.to_owned()); - sender_avatar_url = sender.avatar_url().map(|s| s.to_string()); - } - - let is_noisy = actions.iter().any(|a| a.sound().is_some()); - - let room_info = NotificationRoomInfo { - id: room.room_id().to_string(), - display_name: room.display_name().await?.to_string(), - avatar_url: room.avatar_url().map(|s| s.to_string()), - canonical_alias: room.canonical_alias().map(|c| c.to_string()), - joined_members_count: room.joined_members_count(), - is_encrypted: room.is_encrypted().await.ok(), - is_direct: room.is_direct().await?, - }; - - let sender_info = NotificationSenderInfo { - display_name: sender_display_name, - avatar_url: sender_avatar_url, - }; - - let item = Self { event: Arc::new(TimelineEvent(event)), sender_info, room_info, is_noisy }; - Ok(item) +impl NotificationClientBuilder { + pub(crate) fn new(client: matrix_sdk::Client) -> Arc { + Arc::new(Self { builder: MatrixNotificationClient::builder(client) }) + } +} + +#[uniffi::export] +impl NotificationClientBuilder { + /// Filter out the notification event according to the push rules present in + /// the event. + pub fn filter_by_push_rules(self: Arc) -> Arc { + let this = unwrap_or_clone_arc(self); + let builder = this.builder.filter_by_push_rules(); + Arc::new(Self { builder }) + } + + /// Automatically retry decryption once, if the notification was received + /// encrypted. + /// + /// The boolean indicates whether we're making use of a cross-process lock + /// for the crypto-store. This should be set to true, if and only if, + /// the notification is received in a process that's different from the + /// main app. + pub fn retry_decryption(self: Arc, with_cross_process_lock: bool) -> Arc { + let this = unwrap_or_clone_arc(self); + let builder = this.builder.retry_decryption(with_cross_process_lock); + Arc::new(Self { builder }) + } + + pub fn finish(self: Arc) -> Arc { + let this = unwrap_or_clone_arc(self); + Arc::new(NotificationClient { inner: this.builder.build() }) + } +} + +#[derive(uniffi::Object)] +pub struct NotificationClient { + inner: MatrixNotificationClient, +} + +#[uniffi::export] +impl NotificationClient { + pub fn get_notification( + &self, + room_id: String, + event_id: String, + ) -> Result, ClientError> { + let room_id = RoomId::parse(room_id)?; + let event_id = EventId::parse(event_id)?; + RUNTIME.block_on(async move { + let item = self + .inner + .get_notification(&room_id, &event_id) + .await + .map_err(ClientError::from)?; + + Ok(item.map(|item| NotificationItem { + event: Arc::new(TimelineEvent(item.event)), + sender_info: NotificationSenderInfo { + display_name: item.sender_display_name, + avatar_url: item.sender_avatar_url, + }, + room_info: NotificationRoomInfo { + display_name: item.room_display_name, + avatar_url: item.room_avatar_url, + canonical_alias: item.room_canonical_alias, + joined_members_count: item.joined_members_count, + is_encrypted: item.is_room_encrypted, + is_direct: item.is_direct_message_room, + }, + is_noisy: item.is_noisy, + })) + }) } } diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index d07d065bb..4f39ef1f2 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -4,15 +4,16 @@ version = "0.6.0" edition = "2021" [features] -default = ["e2e-encryption", "native-tls", "experimental-room-list", "experimental-encryption-sync"] +default = ["e2e-encryption", "native-tls", "experimental-room-list", "experimental-encryption-sync", "experimental-notification-client"] e2e-encryption = ["matrix-sdk/e2e-encryption"] native-tls = ["matrix-sdk/native-tls"] rustls-tls = ["matrix-sdk/rustls-tls"] -experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"] experimental-encryption-sync = ["experimental-sliding-sync", "dep:async-stream"] +experimental-notification-client = ["experimental-encryption-sync"] +experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"] experimental-sliding-sync = ["matrix-sdk/experimental-sliding-sync"] testing = ["dep:eyeball-im-util"] diff --git a/crates/matrix-sdk-ui/src/encryption_sync/mod.rs b/crates/matrix-sdk-ui/src/encryption_sync/mod.rs index a50d9c484..53f342c3a 100644 --- a/crates/matrix-sdk-ui/src/encryption_sync/mod.rs +++ b/crates/matrix-sdk-ui/src/encryption_sync/mod.rs @@ -36,13 +36,20 @@ use matrix_sdk_crypto::store::locks::CryptoStoreLock; use ruma::{api::client::sync::sync_events::v4, assign}; use tracing::{error, trace}; -#[derive(Clone, Copy)] -pub enum EncryptionSyncMode { - /// Run the loop for a fixed amount of iterations. - RunFixedIterations(u8), +/// Should the `EncryptionSync` make use of locking? +pub enum WithLocking { + Yes, + No, +} - /// Never stop running the loop, except if asked to stop. - NeverStop, +impl From for WithLocking { + fn from(value: bool) -> Self { + if value { + Self::Yes + } else { + Self::No + } + } } /// High-level helper for synchronizing encryption events using sliding sync. @@ -51,7 +58,7 @@ pub enum EncryptionSyncMode { pub struct EncryptionSync { client: Client, sliding_sync: SlidingSync, - mode: EncryptionSyncMode, + with_locking: bool, } impl EncryptionSync { @@ -64,7 +71,8 @@ impl EncryptionSync { pub async fn new( process_id: String, client: Client, - mode: EncryptionSyncMode, + proxy_and_network_timeouts: Option<(Duration, Duration)>, + with_locking: WithLocking, ) -> Result { // Make sure to use the same `conn_id` and caching store identifier, whichever // process is running this sliding sync. There must be at most one @@ -77,26 +85,109 @@ impl EncryptionSync { ) .with_e2ee_extension(assign!(v4::E2EEConfig::default(), { enabled: Some(true)})); - if matches!(mode, EncryptionSyncMode::RunFixedIterations(..)) { - builder = builder.with_timeouts(Duration::from_secs(4), Duration::from_secs(4)); + if let Some((proxy_timeout, network_timeout)) = proxy_and_network_timeouts { + builder = builder.with_timeouts(proxy_timeout, network_timeout); } let sliding_sync = builder.build().await.map_err(Error::SlidingSync)?; - // Gently try to set the cross-process lock on behalf of the user. - match client.encryption().enable_cross_process_store_lock(process_id).await { - Ok(()) | Err(matrix_sdk::Error::BadCryptoStoreState) => { - // Ignore; we've already set the crypto store lock to - // something, and that's sufficient as - // long as it uniquely identifies the process. - } - Err(err) => { - // Any other error is fatal - return Err(Error::ClientError(err)); + let with_locking = matches!(with_locking, WithLocking::Yes); + + if with_locking { + // Gently try to enable the cross-process lock on behalf of the user. + match client.encryption().enable_cross_process_store_lock(process_id).await { + Ok(()) | Err(matrix_sdk::Error::BadCryptoStoreState) => { + // Ignore; we've already set the crypto store lock to + // something, and that's sufficient as + // long as it uniquely identifies the process. + } + Err(err) => { + // Any other error is fatal + return Err(Error::ClientError(err)); + } + }; + } + + Ok(Self { client, sliding_sync, with_locking }) + } + + pub async fn run_fixed_iterations(self, num_iterations: u8) -> Result<(), Error> { + let sync = self.sliding_sync.sync(); + + pin_mut!(sync); + + let _lock_guard = if self.with_locking { + let mut lock_guard = + self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?; + + // Try to take the lock at the beginning; if it's busy, that means that another + // process already holds onto it, and as such we won't try to run the + // encryption sync loop at all (because we expect the other process to + // do so). + + if lock_guard.is_none() { + // If we can't acquire the cross-process lock on the first attempt, + // that means the main process is running, or its lease hasn't expired + // yet. In case it's the latter, wait a bit and retry. + tracing::debug!( + "Lock was already taken, and we're not the main loop; retrying in {}ms...", + CryptoStoreLock::LEASE_DURATION_MS + ); + + tokio::time::sleep(Duration::from_millis( + CryptoStoreLock::LEASE_DURATION_MS.into(), + )) + .await; + + lock_guard = self + .client + .encryption() + .try_lock_store_once() + .await + .map_err(Error::LockError)?; + + if lock_guard.is_none() { + tracing::debug!( + "Second attempt at locking outside the main app failed, aborting." + ); + return Ok(()); + } } + + lock_guard + } else { + None }; - Ok(Self { client, sliding_sync, mode }) + for _ in 0..num_iterations { + match sync.next().await { + Some(Ok(update_summary)) => { + // This API is only concerned with the e2ee and to-device extensions. + // Warn if anything weird has been received from the proxy. + if !update_summary.lists.is_empty() { + error!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API"); + } + if !update_summary.rooms.is_empty() { + error!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API"); + } + + // Cool cool, let's do it again. + trace!("Encryption sync received an update!"); + } + + Some(Err(err)) => { + trace!("Encryption sync stopped because of an error: {err:#}"); + return Err(Error::SlidingSync(err)); + } + + None => { + trace!("Encryption sync properly terminated."); + break; + } + } + } + + Ok(()) } /// Start synchronization. @@ -108,61 +199,15 @@ impl EncryptionSync { pin_mut!(sync); - let mut mode = self.mode; - loop { - let guard = match &mut mode { - EncryptionSyncMode::RunFixedIterations(ref mut val) => { - if *val == 0 { - // The previous attempt was the last one, stop now. - break; - } - // Soon. - *val -= 1; - - let mut guard = self - .client - .encryption() - .try_lock_store_once() - .await - .map_err(Error::LockError)?; - - if guard.is_none() { - // If we can't acquire the cross-process lock on the first attempt, - // that means the main process is running, or its lease hasn't expired - // yet. In case it's the latter, wait a bit and retry. - tracing::debug!( - "Lock was already taken, and we're not the main loop; retrying in {}ms...", - CryptoStoreLock::LEASE_DURATION_MS - ); - - tokio::time::sleep(Duration::from_millis( - CryptoStoreLock::LEASE_DURATION_MS.into(), - )) - .await; - - guard = self - .client - .encryption() - .try_lock_store_once() - .await - .map_err(Error::LockError)?; - - if guard.is_none() { - tracing::debug!("Second attempt at locking outside the main app failed, so aborting."); - return; - } - } - - guard - } - - EncryptionSyncMode::NeverStop => self - .client + let guard = if self.with_locking { + self.client .encryption() .spin_lock_store(Some(60000)) .await - .map_err(Error::LockError)?, + .map_err(Error::LockError)? + } else { + None }; match sync.next().await { diff --git a/crates/matrix-sdk-ui/src/lib.rs b/crates/matrix-sdk-ui/src/lib.rs index 8d4b2a5b0..d29f6d137 100644 --- a/crates/matrix-sdk-ui/src/lib.rs +++ b/crates/matrix-sdk-ui/src/lib.rs @@ -16,6 +16,8 @@ mod events; #[cfg(feature = "experimental-encryption-sync")] pub mod encryption_sync; +#[cfg(feature = "experimental-notification-client")] +pub mod notification_client; #[cfg(feature = "experimental-room-list")] pub mod room_list_service; pub mod timeline; diff --git a/crates/matrix-sdk-ui/src/notification_client.rs b/crates/matrix-sdk-ui/src/notification_client.rs new file mode 100644 index 000000000..87c25e03e --- /dev/null +++ b/crates/matrix-sdk-ui/src/notification_client.rs @@ -0,0 +1,270 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for that specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use matrix_sdk::{room::Room, Client}; +use matrix_sdk_base::StoreError; +use ruma::{events::AnySyncTimelineEvent, EventId, RoomId}; +use thiserror::Error; + +use crate::encryption_sync::{EncryptionSync, WithLocking}; + +/// A client specialized for handling push notifications received over the +/// network, for an app. +/// +/// In particular, it takes care of running a full decryption sync, in case the +/// event in the notification was impossible to decrypt beforehand. +pub struct NotificationClient { + /// SDK client. + client: Client, + + /// Should we retry decrypting an event, after it was impossible to decrypt + /// on the first attempt? + retry_decryption: bool, + + /// Should the encryption sync happening in case the notification event was + /// encrypted use a cross-process lock? + /// + /// Only meaningful if `retry_decryption` is true. + with_cross_process_lock: bool, + + /// Should we try to filter out the notification event according to the push + /// rules? + filter_by_push_rules: bool, +} + +impl NotificationClient { + /// Create a new builder for a notification client. + pub fn builder(client: Client) -> NotificationClientBuilder { + NotificationClientBuilder::new(client) + } + + /// Get a full notification, given a room id and event id. + pub async fn get_notification( + &self, + room_id: &RoomId, + event_id: &EventId, + ) -> Result, Error> { + // TODO(bnjbvr) invites don't have access to the room! Make a separate path to + // handle those? + let Some(room) = self.client.get_room(room_id) else { return Err(Error::UnknownRoom) }; + + let mut timeline_event = room.event(event_id).await?; + + if self.filter_by_push_rules + && !timeline_event.push_actions.iter().any(|a| a.should_notify()) + { + return Ok(None); + } + + let mut raw_event: AnySyncTimelineEvent = + timeline_event.event.deserialize().map_err(|_| Error::InvalidRumaEvent)?.into(); + + let event_type = raw_event.event_type(); + + let is_still_encrypted = + matches!(event_type, ruma::events::TimelineEventType::RoomEncrypted); + + #[cfg(feature = "unstable-msc3956")] + let is_still_encrypted = + is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted); + + if is_still_encrypted && self.retry_decryption { + // The message is still encrypted, and the client is configured to retry + // decryption. + // + // Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop: + // - the first iteration allows to get SS events as well as send e2ee requests. + // - the second one let the SS proxy forward events triggered by the sending of + // e2ee requests. + // + // Keep timeouts small for both, since we might be short on time. + + let with_locking = WithLocking::from(self.with_cross_process_lock); + + let encryption_sync = EncryptionSync::new( + "notifications".to_owned(), + self.client.clone(), + Some((Duration::from_secs(3), Duration::from_secs(1))), + with_locking, + ) + .await; + + // Just log out errors, but don't have them abort the notification processing: + // an undecrypted notification is still better than no + // notifications. + + match encryption_sync { + Ok(sync) => match sync.run_fixed_iterations(2).await { + Ok(()) => match room.decrypt_event(timeline_event.event.cast_ref()).await { + Ok(decrypted_event) => { + timeline_event = decrypted_event; + raw_event = timeline_event + .event + .deserialize() + .map_err(|_| Error::InvalidRumaEvent)? + .into(); + } + Err(err) => { + tracing::warn!( + "error when retrying decryption in get_notification: {err:#}" + ); + } + }, + Err(err) => { + tracing::warn!( + "error when running encryption_sync in get_notification: {err:#}" + ); + } + }, + Err(err) => { + tracing::warn!( + "error when building encryption_sync in get_notification: {err:#}", + ); + } + } + } + + let sender = match &room { + Room::Invited(invited) => invited.invite_details().await?.inviter, + _ => room.get_member(raw_event.sender()).await?, + }; + + let (sender_display_name, sender_avatar_url) = match sender { + Some(sender) => ( + sender.display_name().map(|s| s.to_owned()), + sender.avatar_url().map(|s| s.to_string()), + ), + None => (None, None), + }; + + let is_noisy = timeline_event.push_actions.iter().any(|a| a.sound().is_some()); + + let item = NotificationItem { + event: raw_event, + sender_display_name, + sender_avatar_url, + room_display_name: room.display_name().await?.to_string(), + room_avatar_url: room.avatar_url().map(|s| s.to_string()), + room_canonical_alias: room.canonical_alias().map(|c| c.to_string()), + is_direct_message_room: room.is_direct().await?, + is_room_encrypted: room.is_encrypted().await.ok(), + joined_members_count: room.joined_members_count(), + is_noisy, + }; + + Ok(Some(item)) + } +} + +/// Builder for a `NotificationClient`. +/// +/// Fields have the same meaning as in `NotificationClient`. +#[derive(Clone)] +pub struct NotificationClientBuilder { + client: Client, + retry_decryption: bool, + with_cross_process_lock: bool, + filter_by_push_rules: bool, +} + +impl NotificationClientBuilder { + fn new(client: Client) -> Self { + Self { + with_cross_process_lock: false, + filter_by_push_rules: false, + retry_decryption: false, + client, + } + } + + /// Filter out the notification event according to the push rules present in + /// the event. + pub fn filter_by_push_rules(mut self) -> Self { + self.filter_by_push_rules = true; + self + } + + /// Automatically retry decryption once, if the notification was received + /// encrypted. + /// + /// The boolean indicates whether we're making use of a cross-process lock + /// for the crypto-store. This should be set to true, if and only if, + /// the notification is received in a process that's different from the + /// main app. + pub fn retry_decryption(mut self, with_cross_process_lock: bool) -> Self { + self.retry_decryption = true; + self.with_cross_process_lock = with_cross_process_lock; + self + } + + /// Finishes configuring the `NotificationClient`. + pub fn build(self) -> NotificationClient { + NotificationClient { + client: self.client, + with_cross_process_lock: self.with_cross_process_lock, + filter_by_push_rules: self.filter_by_push_rules, + retry_decryption: self.retry_decryption, + } + } +} + +/// A notification with its full content. +pub struct NotificationItem { + /// Underlying Ruma event. + pub event: AnySyncTimelineEvent, + + /// Display name of the sender. + pub sender_display_name: Option, + /// Avatar URL of the sender. + pub sender_avatar_url: Option, + + /// Room display name. + pub room_display_name: String, + /// Room avatar URL. + pub room_avatar_url: Option, + /// Room canonical alias. + pub room_canonical_alias: Option, + /// Is this room encrypted? + pub is_room_encrypted: Option, + /// Is this room considered a direct message? + pub is_direct_message_room: bool, + /// Numbers of members who joined the room. + pub joined_members_count: u64, + + /// Is it a noisy notification? (i.e. does any push action contain a sound + /// action) + pub is_noisy: bool, +} + +/// An error for the [`NotificationClient`]. +#[derive(Debug, Error)] +pub enum Error { + /// The room associated to this event wasn't found. + #[error("unknown room for a notification")] + UnknownRoom, + + /// The Ruma event contained within this notification couldn't be parsed. + #[error("invalid ruma event")] + InvalidRumaEvent, + + /// An error forwarded from the client. + #[error(transparent)] + SdkError(#[from] matrix_sdk::Error), + + /// An error forwarded from the underlying state store. + #[error(transparent)] + StoreError(#[from] StoreError), +} diff --git a/crates/matrix-sdk-ui/tests/integration/encryption_sync.rs b/crates/matrix-sdk-ui/tests/integration/encryption_sync.rs index 05b45cec1..0b529e810 100644 --- a/crates/matrix-sdk-ui/tests/integration/encryption_sync.rs +++ b/crates/matrix-sdk-ui/tests/integration/encryption_sync.rs @@ -1,16 +1,24 @@ +use std::sync::Mutex; + use futures_util::{pin_mut, StreamExt as _}; use matrix_sdk::SlidingSync; use matrix_sdk_test::async_test; -use matrix_sdk_ui::encryption_sync::{EncryptionSync, EncryptionSyncMode}; +use matrix_sdk_ui::encryption_sync::{EncryptionSync, WithLocking}; +use serde_json::json; +use wiremock::{Match as _, Mock, MockGuard, MockServer, Request, ResponseTemplate}; -use crate::{logged_in_client, sliding_sync_then_assert_request_and_fake_response}; +use crate::{ + logged_in_client, + sliding_sync::{PartialSlidingSyncRequest, SlidingSyncMatcher}, + sliding_sync_then_assert_request_and_fake_response, +}; #[async_test] async fn test_smoke_encryption_sync_works() -> anyhow::Result<()> { let (client, server) = logged_in_client().await; let encryption_sync = - EncryptionSync::new("tests".to_owned(), client, EncryptionSyncMode::NeverStop).await?; + EncryptionSync::new("tests".to_owned(), client, None, WithLocking::Yes).await?; let stream = encryption_sync.sync(); pin_mut!(stream); @@ -127,38 +135,86 @@ async fn test_smoke_encryption_sync_works() -> anyhow::Result<()> { Ok(()) } +async fn setup_mocking_sliding_sync_server(server: &MockServer) -> MockGuard { + let pos = Mutex::new(0); + + Mock::given(SlidingSyncMatcher) + .respond_with(move |request: &Request| { + let partial_request: PartialSlidingSyncRequest = request.body_json().unwrap(); + // Repeat the transaction id in the response, to validate sticky parameters. + let mut pos = pos.lock().unwrap(); + *pos += 1; + let pos_as_str = (*pos).to_string(); + ResponseTemplate::new(200).set_body_json(json!({ + "txn_id": partial_request.txn_id, + "pos": pos_as_str + })) + }) + .mount_as_scoped(server) + .await +} + +async fn check_requests(server: MockServer, expected_requests: &[serde_json::Value]) { + let mut num_requests = 0; + + for request in &server.received_requests().await.expect("Request recording has been disabled") { + if !SlidingSyncMatcher.matches(request) { + continue; + } + + assert!( + num_requests < expected_requests.len(), + "unexpected extra requests received in the server" + ); + + let mut json_value = serde_json::from_slice::(&request.body).unwrap(); + + // Strip the transaction id, if present. + if let Some(root) = json_value.as_object_mut() { + root.remove("txn_id"); + } + + if let Err(error) = assert_json_diff::assert_json_matches_no_panic( + &json_value, + &expected_requests[num_requests], + assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict), + ) { + dbg!(json_value); + panic!("{error}"); + } + + num_requests += 1; + } + + assert_eq!(num_requests, expected_requests.len(), "missing requests"); +} + #[async_test] async fn test_encryption_sync_one_fixed_iteration() -> anyhow::Result<()> { let (client, server) = logged_in_client().await; + let _guard = setup_mocking_sliding_sync_server(&server).await; + let encryption_sync = - EncryptionSync::new("tests".to_owned(), client, EncryptionSyncMode::RunFixedIterations(1)) - .await?; + EncryptionSync::new("tests".to_owned(), client, None, WithLocking::Yes).await?; - let stream = encryption_sync.sync(); - pin_mut!(stream); + // Run all the iterations. + encryption_sync.run_fixed_iterations(1).await?; - sliding_sync_then_assert_request_and_fake_response! { - [server, stream] - assert request = { - "conn_id": "encryption", - "extensions": { - "e2ee": { - "enabled": true - }, - "to_device": { - "enabled": true - } + // Check the requests are the ones we've expected. + let expected_requests = [json!({ + "conn_id": "encryption", + "extensions": { + "e2ee": { + "enabled": true + }, + "to_device": { + "enabled": true } - }, - respond with = { - "pos": "0" - }, - }; + } + })]; - // Only run once, so the next iteration on the stream should stop it. - assert!(stream.next().await.is_none()); - assert!(stream.next().await.is_none()); + check_requests(server, &expected_requests).await; Ok(()) } @@ -167,17 +223,17 @@ async fn test_encryption_sync_one_fixed_iteration() -> anyhow::Result<()> { async fn test_encryption_sync_two_fixed_iterations() -> anyhow::Result<()> { let (client, server) = logged_in_client().await; - let encryption_sync = - EncryptionSync::new("tests".to_owned(), client, EncryptionSyncMode::RunFixedIterations(2)) - .await?; + let _guard = setup_mocking_sliding_sync_server(&server).await; - let stream = encryption_sync.sync(); - pin_mut!(stream); + let encryption_sync = + EncryptionSync::new("tests".to_owned(), client, None, WithLocking::Yes).await?; + + encryption_sync.run_fixed_iterations(2).await?; // First iteration fills the whole request. - sliding_sync_then_assert_request_and_fake_response! { - [server, stream] - assert request = { + // Second iteration only sends non-sticky parameters. + let expected_requests = [ + json!({ "conn_id": "encryption", "extensions": { "e2ee": { @@ -187,26 +243,13 @@ async fn test_encryption_sync_two_fixed_iterations() -> anyhow::Result<()> { "enabled": true } } - }, - respond with = { - "pos": "0" - }, - }; - - // Second iteration only sends non-sticky parameters. - sliding_sync_then_assert_request_and_fake_response! { - [server, stream] - assert request = { + }), + json!({ "conn_id": "encryption", - }, - respond with = { - "pos": "1", - }, - }; + }), + ]; - // This was the last iteration, there should be no next one. - assert!(stream.next().await.is_none()); - assert!(stream.next().await.is_none()); + check_requests(server, &expected_requests).await; Ok(()) } @@ -216,8 +259,7 @@ async fn test_encryption_sync_always_reloads_todevice_token() -> anyhow::Result< let (client, server) = logged_in_client().await; let encryption_sync = - EncryptionSync::new("tests".to_owned(), client.clone(), EncryptionSyncMode::NeverStop) - .await?; + EncryptionSync::new("tests".to_owned(), client.clone(), None, WithLocking::Yes).await?; let stream = encryption_sync.sync(); pin_mut!(stream); diff --git a/crates/matrix-sdk-ui/tests/integration/main.rs b/crates/matrix-sdk-ui/tests/integration/main.rs index bbd8b1d14..42f14599b 100644 --- a/crates/matrix-sdk-ui/tests/integration/main.rs +++ b/crates/matrix-sdk-ui/tests/integration/main.rs @@ -28,6 +28,8 @@ use wiremock::{ #[cfg(feature = "experimental-encryption-sync")] mod encryption_sync; +#[cfg(feature = "experimental-notification-client")] +mod notification_client; #[cfg(feature = "experimental-room-list")] mod room_list_service; mod sliding_sync; diff --git a/crates/matrix-sdk-ui/tests/integration/notification_client.rs b/crates/matrix-sdk-ui/tests/integration/notification_client.rs new file mode 100644 index 000000000..15670d923 --- /dev/null +++ b/crates/matrix-sdk-ui/tests/integration/notification_client.rs @@ -0,0 +1,94 @@ +use std::time::Duration; + +use matrix_sdk::config::SyncSettings; +use matrix_sdk_test::{async_test, EventBuilder, JoinedRoomBuilder, TimelineTestEvent}; +use matrix_sdk_ui::notification_client::NotificationClient; +use ruma::{event_id, events::TimelineEventType, room_id, user_id}; +use serde_json::json; +use wiremock::{ + matchers::{header, method, path, path_regex}, + Mock, ResponseTemplate, +}; + +use crate::{logged_in_client, mock_encryption_state, mock_sync}; + +#[async_test] +async fn test_notification_client_simple() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let event_id = event_id!("$example_event_id"); + let sender = user_id!("@user:example.org"); + let event_json = json!({ + "content": { + "body": "Hello world!", + "msgtype": "m.text", + }, + "room_id": room_id.clone(), + "event_id": event_id, + "origin_server_ts": 152049794, + "sender": sender.clone(), + "type": "m.room.message", + }); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id.clone()) + .add_timeline_event(TimelineTestEvent::Custom(event_json.clone())), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let notification_client = NotificationClient::builder(client).build(); + + { + Mock::given(method("GET")) + .and(path(format!("/_matrix/client/r0/rooms/{room_id}/event/{event_id}"))) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(event_json)) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "chunk": [ + { + "content": { + "avatar_url": null, + "displayname": "John Mastodon", + "membership": "join" + }, + "room_id": room_id.clone(), + "event_id": "$151800140517rfvjc:example.org", + "membership": "join", + "origin_server_ts": 151800140, + "sender": sender.clone(), + "state_key": sender, + "type": "m.room.member", + "unsigned": { + "age": 2970366, + } + } + ] + }))) + .mount(&server) + .await; + + mock_encryption_state(&server, false).await; + } + + let item = notification_client.get_notification(room_id, event_id).await.unwrap(); + + server.reset().await; + + let item = item.expect("the notification should be found"); + + assert_eq!(item.event.event_type(), TimelineEventType::RoomMessage); + assert_eq!(item.sender_display_name.as_deref(), Some("John Mastodon")); +}