feat: add a new NotificationClient API 📬 (#2235)

* chore: rename EncryptionSyncMode variants

* feat: split the encryption sync modes into two different functions

* feat: make locking optional in the `EncryptionSync`

* feat: experimental notification client that retries decryption if it failed the first time

* fix: don't iloop retrying decryption

* chore: helper to convert from bool to `WithLocking`

* feat: don't loop and just retry decryption of the notification event linearly

* feat: remove unused set_notification_delegate

Dead code is dead.

* ffi: get rid of `get_notification_item` and introduce the `NotificationClient`

* fmt

* feat: don't swallow encryption sync errors when retrying notification event decryption

* keeping a tidy commit history is NP-hard

* will i ever learn

* chore: enable experimental-notification-client in the FFI crate

* test: add basic integration test for the common path

* Address first batch of review comments, thanks Jonas!
This commit is contained in:
Benjamin Bouvier
2023-07-10 18:06:13 +02:00
committed by GitHub
parent a094e10b35
commit 2acc13ed2c
12 changed files with 693 additions and 294 deletions

View File

@@ -29,7 +29,7 @@ eyeball-im = { workspace = true }
extension-trait = "1.0.1"
futures-core = { workspace = true }
futures-util = { workspace = true }
matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui", default-features = false, features = ["e2e-encryption", "experimental-room-list", "experimental-encryption-sync"] }
matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui", default-features = false, features = ["e2e-encryption", "experimental-room-list", "experimental-encryption-sync", "experimental-notification-client"] }
mime = "0.3.16"
# FIXME: we currently can't feature flag anything in the api.udl, therefore we must enforce experimental-sliding-sync being exposed here..
# see https://github.com/matrix-org/matrix-rust-sdk/issues/1014

View File

@@ -24,10 +24,7 @@ use matrix_sdk::{
},
Client as MatrixClient,
};
use ruma::{
push::{HttpPusherData as RumaHttpPusherData, PushFormat as RumaPushFormat},
RoomId,
};
use ruma::push::{HttpPusherData as RumaHttpPusherData, PushFormat as RumaPushFormat};
use serde_json::Value;
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, error};
@@ -35,7 +32,7 @@ use url::Url;
use super::{room::Room, session_verification::SessionVerificationController, RUNTIME};
use crate::{
client, notification::NotificationItem, notification_settings::NotificationSettings,
client, notification::NotificationClientBuilder, notification_settings::NotificationSettings,
ClientError,
};
@@ -104,11 +101,6 @@ pub trait ClientDelegate: Sync + Send {
fn did_receive_auth_error(&self, is_soft_logout: bool);
}
#[uniffi::export(callback_interface)]
pub trait NotificationDelegate: Sync + Send {
fn did_receive_notification(&self, notification: NotificationItem);
}
#[uniffi::export(callback_interface)]
pub trait ProgressWatcher: Send + Sync {
fn transmission_progress(&self, progress: TransmissionProgress);
@@ -133,7 +125,6 @@ impl From<matrix_sdk::TransmissionProgress> for TransmissionProgress {
pub struct Client {
pub(crate) inner: MatrixClient,
delegate: Arc<RwLock<Option<Box<dyn ClientDelegate>>>>,
notification_delegate: Arc<RwLock<Option<Box<dyn NotificationDelegate>>>>,
session_verification_controller:
Arc<tokio::sync::RwLock<Option<SessionVerificationController>>>,
}
@@ -159,7 +150,6 @@ impl Client {
let client = Client {
inner: sdk_client,
delegate: Arc::new(RwLock::new(None)),
notification_delegate: Arc::new(RwLock::new(None)),
session_verification_controller,
};
@@ -594,51 +584,8 @@ impl Client {
})
}
/// Sets a notification delegate and a handler.
///
/// The sliding sync requires to have registered m.room.member with value
/// $ME and m.room.power_levels to be able to intercept the events.
/// This function blocks execution and should be dispatched concurrently.
pub fn set_notification_delegate(
&self,
notification_delegate: Option<Box<dyn NotificationDelegate>>,
) {
*self.notification_delegate.write().unwrap() = notification_delegate;
let notification_delegate = Arc::clone(&self.notification_delegate);
let handler = move |notification, room: SdkRoom, _| {
let notification_delegate = Arc::clone(&notification_delegate);
async move {
if let Ok(Some(notification_item)) =
NotificationItem::new_from_notification(notification, room).await
{
if let Some(notification_delegate) =
notification_delegate.read().unwrap().as_ref()
{
notification_delegate.did_receive_notification(notification_item);
}
}
}
};
RUNTIME.block_on(async move {
self.inner.register_notification_handler(handler).await;
})
}
pub fn get_notification_item(
&self,
room_id: String,
event_id: String,
filter_by_push_rules: bool,
) -> Result<Option<NotificationItem>, ClientError> {
RUNTIME.block_on(async move {
// We may also need to do a sync here since this may fail if the keys are not
// valid anymore
let room_id = RoomId::parse(room_id)?;
let room = self.inner.get_room(&room_id).context("Room not found")?;
let notification =
NotificationItem::new_from_event_id(&event_id, room, filter_by_push_rules).await?;
Ok(notification)
})
pub fn notification_client(&self) -> Arc<NotificationClientBuilder> {
NotificationClientBuilder::new(self.inner.clone())
}
pub fn get_notification_settings(&self) -> Arc<NotificationSettings> {

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use futures_util::{pin_mut, StreamExt as _};
use matrix_sdk_ui::encryption_sync::{EncryptionSync as MatrixEncryptionSync, EncryptionSyncMode};
use matrix_sdk_ui::encryption_sync::EncryptionSync as MatrixEncryptionSync;
use tracing::{error, warn};
use crate::{client::Client, error::ClientError, task_handle::TaskHandle, RUNTIME};
@@ -75,23 +75,6 @@ impl EncryptionSync {
}
}
impl Client {
fn encryption_sync(
&self,
id: String,
listener: Box<dyn EncryptionSyncListener>,
mode: EncryptionSyncMode,
) -> Result<Arc<EncryptionSync>, ClientError> {
RUNTIME.block_on(async move {
let inner = Arc::new(MatrixEncryptionSync::new(id, self.inner.clone(), mode).await?);
let handle = EncryptionSync::start(inner.clone(), listener);
Ok(Arc::new(EncryptionSync { _handle: handle, sync: inner }))
})
}
}
#[uniffi::export]
impl Client {
/// Must be called to get the encryption loop running.
@@ -110,25 +93,20 @@ impl Client {
id: String,
listener: Box<dyn EncryptionSyncListener>,
) -> Result<Arc<EncryptionSync>, ClientError> {
self.encryption_sync(id, listener, EncryptionSyncMode::NeverStop)
}
RUNTIME.block_on(async move {
let inner = Arc::new(
MatrixEncryptionSync::new(
id,
self.inner.clone(),
None,
matrix_sdk_ui::encryption_sync::WithLocking::Yes,
)
.await?,
);
/// Encryption loop for a notification process.
///
/// A fixed number of iterations can be given, to limit the time spent in
/// that loop.
///
/// This should be avoided, whenever possible, and be used only in
/// situations where the encryption sync loop needs to run from multiple
/// processes at the same time (on iOS for instance, where notifications
/// are handled in a separate process). If you aren't in such a
/// situation, prefer using `Client::room_list(true)`.
pub fn notification_encryption_sync(
&self,
id: String,
listener: Box<dyn EncryptionSyncListener>,
num_iters: u8,
) -> Result<Arc<EncryptionSync>, ClientError> {
self.encryption_sync(id, listener, EncryptionSyncMode::RunFixedIterations(num_iters))
let handle = EncryptionSync::start(inner.clone(), listener);
Ok(Arc::new(EncryptionSync { _handle: handle, sync: inner }))
})
}
}

View File

@@ -4,7 +4,7 @@ use matrix_sdk::{
self, encryption::CryptoStoreError, HttpError, IdParseError,
NotificationSettingsError as SdkNotificationSettingsError, StoreError,
};
use matrix_sdk_ui::{encryption_sync, timeline};
use matrix_sdk_ui::{encryption_sync, notification_client, timeline};
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
@@ -84,6 +84,12 @@ impl From<timeline::Error> for ClientError {
}
}
impl From<notification_client::Error> for ClientError {
fn from(e: notification_client::Error) -> Self {
Self::new(e)
}
}
#[derive(Debug, thiserror::Error, uniffi::Error)]
#[uniffi(flat_error)]
pub enum RoomError {

View File

@@ -1,12 +1,12 @@
use std::sync::Arc;
use matrix_sdk::room::Room;
use ruma::{
api::client::push::get_notifications::v3::Notification, events::AnySyncTimelineEvent,
push::Action, EventId,
use matrix_sdk_ui::notification_client::{
NotificationClient as MatrixNotificationClient,
NotificationClientBuilder as MatrixNotificationClientBuilder,
};
use ruma::{EventId, RoomId};
use crate::event::TimelineEvent;
use crate::{error::ClientError, event::TimelineEvent, helpers::unwrap_or_clone_arc, RUNTIME};
#[derive(uniffi::Record)]
pub struct NotificationSenderInfo {
@@ -16,7 +16,6 @@ pub struct NotificationSenderInfo {
#[derive(uniffi::Record)]
pub struct NotificationRoomInfo {
pub id: String,
pub display_name: String,
pub avatar_url: Option<String>,
pub canonical_alias: Option<String>,
@@ -35,70 +34,83 @@ pub struct NotificationItem {
pub is_noisy: bool,
}
impl NotificationItem {
pub(crate) async fn new_from_notification(
notification: Notification,
room: Room,
) -> anyhow::Result<Option<Self>> {
if !notification.actions.iter().any(|a| a.should_notify()) {
return Ok(None);
}
#[derive(Clone, uniffi::Object)]
pub struct NotificationClientBuilder {
builder: MatrixNotificationClientBuilder,
}
let deserialized_event = notification.event.deserialize()?;
Self::new(deserialized_event, room, notification.actions).await.map(Some)
}
pub(crate) async fn new_from_event_id(
event_id: &str,
room: Room,
filter_by_push_rules: bool,
) -> anyhow::Result<Option<Self>> {
let event_id = EventId::parse(event_id)?;
let ruma_event = room.event(&event_id).await?;
if filter_by_push_rules && !ruma_event.push_actions.iter().any(|a| a.should_notify()) {
return Ok(None);
}
let event: AnySyncTimelineEvent = ruma_event.event.deserialize()?.into();
Self::new(event, room, ruma_event.push_actions).await.map(Some)
}
async fn new(
event: AnySyncTimelineEvent,
room: Room,
actions: Vec<Action>,
) -> anyhow::Result<Self> {
let sender = match &room {
Room::Invited(invited) => invited.invite_details().await?.inviter,
_ => room.get_member(event.sender()).await?,
};
let mut sender_display_name = None;
let mut sender_avatar_url = None;
if let Some(sender) = sender {
sender_display_name = sender.display_name().map(|s| s.to_owned());
sender_avatar_url = sender.avatar_url().map(|s| s.to_string());
}
let is_noisy = actions.iter().any(|a| a.sound().is_some());
let room_info = NotificationRoomInfo {
id: room.room_id().to_string(),
display_name: room.display_name().await?.to_string(),
avatar_url: room.avatar_url().map(|s| s.to_string()),
canonical_alias: room.canonical_alias().map(|c| c.to_string()),
joined_members_count: room.joined_members_count(),
is_encrypted: room.is_encrypted().await.ok(),
is_direct: room.is_direct().await?,
};
let sender_info = NotificationSenderInfo {
display_name: sender_display_name,
avatar_url: sender_avatar_url,
};
let item = Self { event: Arc::new(TimelineEvent(event)), sender_info, room_info, is_noisy };
Ok(item)
impl NotificationClientBuilder {
pub(crate) fn new(client: matrix_sdk::Client) -> Arc<Self> {
Arc::new(Self { builder: MatrixNotificationClient::builder(client) })
}
}
#[uniffi::export]
impl NotificationClientBuilder {
/// Filter out the notification event according to the push rules present in
/// the event.
pub fn filter_by_push_rules(self: Arc<Self>) -> Arc<Self> {
let this = unwrap_or_clone_arc(self);
let builder = this.builder.filter_by_push_rules();
Arc::new(Self { builder })
}
/// Automatically retry decryption once, if the notification was received
/// encrypted.
///
/// The boolean indicates whether we're making use of a cross-process lock
/// for the crypto-store. This should be set to true, if and only if,
/// the notification is received in a process that's different from the
/// main app.
pub fn retry_decryption(self: Arc<Self>, with_cross_process_lock: bool) -> Arc<Self> {
let this = unwrap_or_clone_arc(self);
let builder = this.builder.retry_decryption(with_cross_process_lock);
Arc::new(Self { builder })
}
pub fn finish(self: Arc<Self>) -> Arc<NotificationClient> {
let this = unwrap_or_clone_arc(self);
Arc::new(NotificationClient { inner: this.builder.build() })
}
}
#[derive(uniffi::Object)]
pub struct NotificationClient {
inner: MatrixNotificationClient,
}
#[uniffi::export]
impl NotificationClient {
pub fn get_notification(
&self,
room_id: String,
event_id: String,
) -> Result<Option<NotificationItem>, ClientError> {
let room_id = RoomId::parse(room_id)?;
let event_id = EventId::parse(event_id)?;
RUNTIME.block_on(async move {
let item = self
.inner
.get_notification(&room_id, &event_id)
.await
.map_err(ClientError::from)?;
Ok(item.map(|item| NotificationItem {
event: Arc::new(TimelineEvent(item.event)),
sender_info: NotificationSenderInfo {
display_name: item.sender_display_name,
avatar_url: item.sender_avatar_url,
},
room_info: NotificationRoomInfo {
display_name: item.room_display_name,
avatar_url: item.room_avatar_url,
canonical_alias: item.room_canonical_alias,
joined_members_count: item.joined_members_count,
is_encrypted: item.is_room_encrypted,
is_direct: item.is_direct_message_room,
},
is_noisy: item.is_noisy,
}))
})
}
}

View File

@@ -4,15 +4,16 @@ version = "0.6.0"
edition = "2021"
[features]
default = ["e2e-encryption", "native-tls", "experimental-room-list", "experimental-encryption-sync"]
default = ["e2e-encryption", "native-tls", "experimental-room-list", "experimental-encryption-sync", "experimental-notification-client"]
e2e-encryption = ["matrix-sdk/e2e-encryption"]
native-tls = ["matrix-sdk/native-tls"]
rustls-tls = ["matrix-sdk/rustls-tls"]
experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"]
experimental-encryption-sync = ["experimental-sliding-sync", "dep:async-stream"]
experimental-notification-client = ["experimental-encryption-sync"]
experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"]
experimental-sliding-sync = ["matrix-sdk/experimental-sliding-sync"]
testing = ["dep:eyeball-im-util"]

View File

@@ -36,13 +36,20 @@ use matrix_sdk_crypto::store::locks::CryptoStoreLock;
use ruma::{api::client::sync::sync_events::v4, assign};
use tracing::{error, trace};
#[derive(Clone, Copy)]
pub enum EncryptionSyncMode {
/// Run the loop for a fixed amount of iterations.
RunFixedIterations(u8),
/// Should the `EncryptionSync` make use of locking?
pub enum WithLocking {
Yes,
No,
}
/// Never stop running the loop, except if asked to stop.
NeverStop,
impl From<bool> for WithLocking {
fn from(value: bool) -> Self {
if value {
Self::Yes
} else {
Self::No
}
}
}
/// High-level helper for synchronizing encryption events using sliding sync.
@@ -51,7 +58,7 @@ pub enum EncryptionSyncMode {
pub struct EncryptionSync {
client: Client,
sliding_sync: SlidingSync,
mode: EncryptionSyncMode,
with_locking: bool,
}
impl EncryptionSync {
@@ -64,7 +71,8 @@ impl EncryptionSync {
pub async fn new(
process_id: String,
client: Client,
mode: EncryptionSyncMode,
proxy_and_network_timeouts: Option<(Duration, Duration)>,
with_locking: WithLocking,
) -> Result<Self, Error> {
// Make sure to use the same `conn_id` and caching store identifier, whichever
// process is running this sliding sync. There must be at most one
@@ -77,26 +85,109 @@ impl EncryptionSync {
)
.with_e2ee_extension(assign!(v4::E2EEConfig::default(), { enabled: Some(true)}));
if matches!(mode, EncryptionSyncMode::RunFixedIterations(..)) {
builder = builder.with_timeouts(Duration::from_secs(4), Duration::from_secs(4));
if let Some((proxy_timeout, network_timeout)) = proxy_and_network_timeouts {
builder = builder.with_timeouts(proxy_timeout, network_timeout);
}
let sliding_sync = builder.build().await.map_err(Error::SlidingSync)?;
// Gently try to set the cross-process lock on behalf of the user.
match client.encryption().enable_cross_process_store_lock(process_id).await {
Ok(()) | Err(matrix_sdk::Error::BadCryptoStoreState) => {
// Ignore; we've already set the crypto store lock to
// something, and that's sufficient as
// long as it uniquely identifies the process.
}
Err(err) => {
// Any other error is fatal
return Err(Error::ClientError(err));
let with_locking = matches!(with_locking, WithLocking::Yes);
if with_locking {
// Gently try to enable the cross-process lock on behalf of the user.
match client.encryption().enable_cross_process_store_lock(process_id).await {
Ok(()) | Err(matrix_sdk::Error::BadCryptoStoreState) => {
// Ignore; we've already set the crypto store lock to
// something, and that's sufficient as
// long as it uniquely identifies the process.
}
Err(err) => {
// Any other error is fatal
return Err(Error::ClientError(err));
}
};
}
Ok(Self { client, sliding_sync, with_locking })
}
pub async fn run_fixed_iterations(self, num_iterations: u8) -> Result<(), Error> {
let sync = self.sliding_sync.sync();
pin_mut!(sync);
let _lock_guard = if self.with_locking {
let mut lock_guard =
self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;
// Try to take the lock at the beginning; if it's busy, that means that another
// process already holds onto it, and as such we won't try to run the
// encryption sync loop at all (because we expect the other process to
// do so).
if lock_guard.is_none() {
// If we can't acquire the cross-process lock on the first attempt,
// that means the main process is running, or its lease hasn't expired
// yet. In case it's the latter, wait a bit and retry.
tracing::debug!(
"Lock was already taken, and we're not the main loop; retrying in {}ms...",
CryptoStoreLock::LEASE_DURATION_MS
);
tokio::time::sleep(Duration::from_millis(
CryptoStoreLock::LEASE_DURATION_MS.into(),
))
.await;
lock_guard = self
.client
.encryption()
.try_lock_store_once()
.await
.map_err(Error::LockError)?;
if lock_guard.is_none() {
tracing::debug!(
"Second attempt at locking outside the main app failed, aborting."
);
return Ok(());
}
}
lock_guard
} else {
None
};
Ok(Self { client, sliding_sync, mode })
for _ in 0..num_iterations {
match sync.next().await {
Some(Ok(update_summary)) => {
// This API is only concerned with the e2ee and to-device extensions.
// Warn if anything weird has been received from the proxy.
if !update_summary.lists.is_empty() {
error!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
}
if !update_summary.rooms.is_empty() {
error!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
}
// Cool cool, let's do it again.
trace!("Encryption sync received an update!");
}
Some(Err(err)) => {
trace!("Encryption sync stopped because of an error: {err:#}");
return Err(Error::SlidingSync(err));
}
None => {
trace!("Encryption sync properly terminated.");
break;
}
}
}
Ok(())
}
/// Start synchronization.
@@ -108,61 +199,15 @@ impl EncryptionSync {
pin_mut!(sync);
let mut mode = self.mode;
loop {
let guard = match &mut mode {
EncryptionSyncMode::RunFixedIterations(ref mut val) => {
if *val == 0 {
// The previous attempt was the last one, stop now.
break;
}
// Soon.
*val -= 1;
let mut guard = self
.client
.encryption()
.try_lock_store_once()
.await
.map_err(Error::LockError)?;
if guard.is_none() {
// If we can't acquire the cross-process lock on the first attempt,
// that means the main process is running, or its lease hasn't expired
// yet. In case it's the latter, wait a bit and retry.
tracing::debug!(
"Lock was already taken, and we're not the main loop; retrying in {}ms...",
CryptoStoreLock::LEASE_DURATION_MS
);
tokio::time::sleep(Duration::from_millis(
CryptoStoreLock::LEASE_DURATION_MS.into(),
))
.await;
guard = self
.client
.encryption()
.try_lock_store_once()
.await
.map_err(Error::LockError)?;
if guard.is_none() {
tracing::debug!("Second attempt at locking outside the main app failed, so aborting.");
return;
}
}
guard
}
EncryptionSyncMode::NeverStop => self
.client
let guard = if self.with_locking {
self.client
.encryption()
.spin_lock_store(Some(60000))
.await
.map_err(Error::LockError)?,
.map_err(Error::LockError)?
} else {
None
};
match sync.next().await {

View File

@@ -16,6 +16,8 @@ mod events;
#[cfg(feature = "experimental-encryption-sync")]
pub mod encryption_sync;
#[cfg(feature = "experimental-notification-client")]
pub mod notification_client;
#[cfg(feature = "experimental-room-list")]
pub mod room_list_service;
pub mod timeline;

View File

@@ -0,0 +1,270 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for that specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use matrix_sdk::{room::Room, Client};
use matrix_sdk_base::StoreError;
use ruma::{events::AnySyncTimelineEvent, EventId, RoomId};
use thiserror::Error;
use crate::encryption_sync::{EncryptionSync, WithLocking};
/// A client specialized for handling push notifications received over the
/// network, for an app.
///
/// In particular, it takes care of running a full decryption sync, in case the
/// event in the notification was impossible to decrypt beforehand.
pub struct NotificationClient {
/// SDK client.
client: Client,
/// Should we retry decrypting an event, after it was impossible to decrypt
/// on the first attempt?
retry_decryption: bool,
/// Should the encryption sync happening in case the notification event was
/// encrypted use a cross-process lock?
///
/// Only meaningful if `retry_decryption` is true.
with_cross_process_lock: bool,
/// Should we try to filter out the notification event according to the push
/// rules?
filter_by_push_rules: bool,
}
impl NotificationClient {
/// Create a new builder for a notification client.
pub fn builder(client: Client) -> NotificationClientBuilder {
NotificationClientBuilder::new(client)
}
/// Get a full notification, given a room id and event id.
pub async fn get_notification(
&self,
room_id: &RoomId,
event_id: &EventId,
) -> Result<Option<NotificationItem>, Error> {
// TODO(bnjbvr) invites don't have access to the room! Make a separate path to
// handle those?
let Some(room) = self.client.get_room(room_id) else { return Err(Error::UnknownRoom) };
let mut timeline_event = room.event(event_id).await?;
if self.filter_by_push_rules
&& !timeline_event.push_actions.iter().any(|a| a.should_notify())
{
return Ok(None);
}
let mut raw_event: AnySyncTimelineEvent =
timeline_event.event.deserialize().map_err(|_| Error::InvalidRumaEvent)?.into();
let event_type = raw_event.event_type();
let is_still_encrypted =
matches!(event_type, ruma::events::TimelineEventType::RoomEncrypted);
#[cfg(feature = "unstable-msc3956")]
let is_still_encrypted =
is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
if is_still_encrypted && self.retry_decryption {
// The message is still encrypted, and the client is configured to retry
// decryption.
//
// Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop:
// - the first iteration allows to get SS events as well as send e2ee requests.
// - the second one let the SS proxy forward events triggered by the sending of
// e2ee requests.
//
// Keep timeouts small for both, since we might be short on time.
let with_locking = WithLocking::from(self.with_cross_process_lock);
let encryption_sync = EncryptionSync::new(
"notifications".to_owned(),
self.client.clone(),
Some((Duration::from_secs(3), Duration::from_secs(1))),
with_locking,
)
.await;
// Just log out errors, but don't have them abort the notification processing:
// an undecrypted notification is still better than no
// notifications.
match encryption_sync {
Ok(sync) => match sync.run_fixed_iterations(2).await {
Ok(()) => match room.decrypt_event(timeline_event.event.cast_ref()).await {
Ok(decrypted_event) => {
timeline_event = decrypted_event;
raw_event = timeline_event
.event
.deserialize()
.map_err(|_| Error::InvalidRumaEvent)?
.into();
}
Err(err) => {
tracing::warn!(
"error when retrying decryption in get_notification: {err:#}"
);
}
},
Err(err) => {
tracing::warn!(
"error when running encryption_sync in get_notification: {err:#}"
);
}
},
Err(err) => {
tracing::warn!(
"error when building encryption_sync in get_notification: {err:#}",
);
}
}
}
let sender = match &room {
Room::Invited(invited) => invited.invite_details().await?.inviter,
_ => room.get_member(raw_event.sender()).await?,
};
let (sender_display_name, sender_avatar_url) = match sender {
Some(sender) => (
sender.display_name().map(|s| s.to_owned()),
sender.avatar_url().map(|s| s.to_string()),
),
None => (None, None),
};
let is_noisy = timeline_event.push_actions.iter().any(|a| a.sound().is_some());
let item = NotificationItem {
event: raw_event,
sender_display_name,
sender_avatar_url,
room_display_name: room.display_name().await?.to_string(),
room_avatar_url: room.avatar_url().map(|s| s.to_string()),
room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
is_direct_message_room: room.is_direct().await?,
is_room_encrypted: room.is_encrypted().await.ok(),
joined_members_count: room.joined_members_count(),
is_noisy,
};
Ok(Some(item))
}
}
/// Builder for a `NotificationClient`.
///
/// Fields have the same meaning as in `NotificationClient`.
#[derive(Clone)]
pub struct NotificationClientBuilder {
client: Client,
retry_decryption: bool,
with_cross_process_lock: bool,
filter_by_push_rules: bool,
}
impl NotificationClientBuilder {
fn new(client: Client) -> Self {
Self {
with_cross_process_lock: false,
filter_by_push_rules: false,
retry_decryption: false,
client,
}
}
/// Filter out the notification event according to the push rules present in
/// the event.
pub fn filter_by_push_rules(mut self) -> Self {
self.filter_by_push_rules = true;
self
}
/// Automatically retry decryption once, if the notification was received
/// encrypted.
///
/// The boolean indicates whether we're making use of a cross-process lock
/// for the crypto-store. This should be set to true, if and only if,
/// the notification is received in a process that's different from the
/// main app.
pub fn retry_decryption(mut self, with_cross_process_lock: bool) -> Self {
self.retry_decryption = true;
self.with_cross_process_lock = with_cross_process_lock;
self
}
/// Finishes configuring the `NotificationClient`.
pub fn build(self) -> NotificationClient {
NotificationClient {
client: self.client,
with_cross_process_lock: self.with_cross_process_lock,
filter_by_push_rules: self.filter_by_push_rules,
retry_decryption: self.retry_decryption,
}
}
}
/// A notification with its full content.
pub struct NotificationItem {
/// Underlying Ruma event.
pub event: AnySyncTimelineEvent,
/// Display name of the sender.
pub sender_display_name: Option<String>,
/// Avatar URL of the sender.
pub sender_avatar_url: Option<String>,
/// Room display name.
pub room_display_name: String,
/// Room avatar URL.
pub room_avatar_url: Option<String>,
/// Room canonical alias.
pub room_canonical_alias: Option<String>,
/// Is this room encrypted?
pub is_room_encrypted: Option<bool>,
/// Is this room considered a direct message?
pub is_direct_message_room: bool,
/// Numbers of members who joined the room.
pub joined_members_count: u64,
/// Is it a noisy notification? (i.e. does any push action contain a sound
/// action)
pub is_noisy: bool,
}
/// An error for the [`NotificationClient`].
#[derive(Debug, Error)]
pub enum Error {
/// The room associated to this event wasn't found.
#[error("unknown room for a notification")]
UnknownRoom,
/// The Ruma event contained within this notification couldn't be parsed.
#[error("invalid ruma event")]
InvalidRumaEvent,
/// An error forwarded from the client.
#[error(transparent)]
SdkError(#[from] matrix_sdk::Error),
/// An error forwarded from the underlying state store.
#[error(transparent)]
StoreError(#[from] StoreError),
}

View File

@@ -1,16 +1,24 @@
use std::sync::Mutex;
use futures_util::{pin_mut, StreamExt as _};
use matrix_sdk::SlidingSync;
use matrix_sdk_test::async_test;
use matrix_sdk_ui::encryption_sync::{EncryptionSync, EncryptionSyncMode};
use matrix_sdk_ui::encryption_sync::{EncryptionSync, WithLocking};
use serde_json::json;
use wiremock::{Match as _, Mock, MockGuard, MockServer, Request, ResponseTemplate};
use crate::{logged_in_client, sliding_sync_then_assert_request_and_fake_response};
use crate::{
logged_in_client,
sliding_sync::{PartialSlidingSyncRequest, SlidingSyncMatcher},
sliding_sync_then_assert_request_and_fake_response,
};
#[async_test]
async fn test_smoke_encryption_sync_works() -> anyhow::Result<()> {
let (client, server) = logged_in_client().await;
let encryption_sync =
EncryptionSync::new("tests".to_owned(), client, EncryptionSyncMode::NeverStop).await?;
EncryptionSync::new("tests".to_owned(), client, None, WithLocking::Yes).await?;
let stream = encryption_sync.sync();
pin_mut!(stream);
@@ -127,38 +135,86 @@ async fn test_smoke_encryption_sync_works() -> anyhow::Result<()> {
Ok(())
}
async fn setup_mocking_sliding_sync_server(server: &MockServer) -> MockGuard {
let pos = Mutex::new(0);
Mock::given(SlidingSyncMatcher)
.respond_with(move |request: &Request| {
let partial_request: PartialSlidingSyncRequest = request.body_json().unwrap();
// Repeat the transaction id in the response, to validate sticky parameters.
let mut pos = pos.lock().unwrap();
*pos += 1;
let pos_as_str = (*pos).to_string();
ResponseTemplate::new(200).set_body_json(json!({
"txn_id": partial_request.txn_id,
"pos": pos_as_str
}))
})
.mount_as_scoped(server)
.await
}
async fn check_requests(server: MockServer, expected_requests: &[serde_json::Value]) {
let mut num_requests = 0;
for request in &server.received_requests().await.expect("Request recording has been disabled") {
if !SlidingSyncMatcher.matches(request) {
continue;
}
assert!(
num_requests < expected_requests.len(),
"unexpected extra requests received in the server"
);
let mut json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
// Strip the transaction id, if present.
if let Some(root) = json_value.as_object_mut() {
root.remove("txn_id");
}
if let Err(error) = assert_json_diff::assert_json_matches_no_panic(
&json_value,
&expected_requests[num_requests],
assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict),
) {
dbg!(json_value);
panic!("{error}");
}
num_requests += 1;
}
assert_eq!(num_requests, expected_requests.len(), "missing requests");
}
#[async_test]
async fn test_encryption_sync_one_fixed_iteration() -> anyhow::Result<()> {
let (client, server) = logged_in_client().await;
let _guard = setup_mocking_sliding_sync_server(&server).await;
let encryption_sync =
EncryptionSync::new("tests".to_owned(), client, EncryptionSyncMode::RunFixedIterations(1))
.await?;
EncryptionSync::new("tests".to_owned(), client, None, WithLocking::Yes).await?;
let stream = encryption_sync.sync();
pin_mut!(stream);
// Run all the iterations.
encryption_sync.run_fixed_iterations(1).await?;
sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
"conn_id": "encryption",
"extensions": {
"e2ee": {
"enabled": true
},
"to_device": {
"enabled": true
}
// Check the requests are the ones we've expected.
let expected_requests = [json!({
"conn_id": "encryption",
"extensions": {
"e2ee": {
"enabled": true
},
"to_device": {
"enabled": true
}
},
respond with = {
"pos": "0"
},
};
}
})];
// Only run once, so the next iteration on the stream should stop it.
assert!(stream.next().await.is_none());
assert!(stream.next().await.is_none());
check_requests(server, &expected_requests).await;
Ok(())
}
@@ -167,17 +223,17 @@ async fn test_encryption_sync_one_fixed_iteration() -> anyhow::Result<()> {
async fn test_encryption_sync_two_fixed_iterations() -> anyhow::Result<()> {
let (client, server) = logged_in_client().await;
let encryption_sync =
EncryptionSync::new("tests".to_owned(), client, EncryptionSyncMode::RunFixedIterations(2))
.await?;
let _guard = setup_mocking_sliding_sync_server(&server).await;
let stream = encryption_sync.sync();
pin_mut!(stream);
let encryption_sync =
EncryptionSync::new("tests".to_owned(), client, None, WithLocking::Yes).await?;
encryption_sync.run_fixed_iterations(2).await?;
// First iteration fills the whole request.
sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
// Second iteration only sends non-sticky parameters.
let expected_requests = [
json!({
"conn_id": "encryption",
"extensions": {
"e2ee": {
@@ -187,26 +243,13 @@ async fn test_encryption_sync_two_fixed_iterations() -> anyhow::Result<()> {
"enabled": true
}
}
},
respond with = {
"pos": "0"
},
};
// Second iteration only sends non-sticky parameters.
sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
}),
json!({
"conn_id": "encryption",
},
respond with = {
"pos": "1",
},
};
}),
];
// This was the last iteration, there should be no next one.
assert!(stream.next().await.is_none());
assert!(stream.next().await.is_none());
check_requests(server, &expected_requests).await;
Ok(())
}
@@ -216,8 +259,7 @@ async fn test_encryption_sync_always_reloads_todevice_token() -> anyhow::Result<
let (client, server) = logged_in_client().await;
let encryption_sync =
EncryptionSync::new("tests".to_owned(), client.clone(), EncryptionSyncMode::NeverStop)
.await?;
EncryptionSync::new("tests".to_owned(), client.clone(), None, WithLocking::Yes).await?;
let stream = encryption_sync.sync();
pin_mut!(stream);

View File

@@ -28,6 +28,8 @@ use wiremock::{
#[cfg(feature = "experimental-encryption-sync")]
mod encryption_sync;
#[cfg(feature = "experimental-notification-client")]
mod notification_client;
#[cfg(feature = "experimental-room-list")]
mod room_list_service;
mod sliding_sync;

View File

@@ -0,0 +1,94 @@
use std::time::Duration;
use matrix_sdk::config::SyncSettings;
use matrix_sdk_test::{async_test, EventBuilder, JoinedRoomBuilder, TimelineTestEvent};
use matrix_sdk_ui::notification_client::NotificationClient;
use ruma::{event_id, events::TimelineEventType, room_id, user_id};
use serde_json::json;
use wiremock::{
matchers::{header, method, path, path_regex},
Mock, ResponseTemplate,
};
use crate::{logged_in_client, mock_encryption_state, mock_sync};
#[async_test]
async fn test_notification_client_simple() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let event_id = event_id!("$example_event_id");
let sender = user_id!("@user:example.org");
let event_json = json!({
"content": {
"body": "Hello world!",
"msgtype": "m.text",
},
"room_id": room_id.clone(),
"event_id": event_id,
"origin_server_ts": 152049794,
"sender": sender.clone(),
"type": "m.room.message",
});
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id.clone())
.add_timeline_event(TimelineTestEvent::Custom(event_json.clone())),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let notification_client = NotificationClient::builder(client).build();
{
Mock::given(method("GET"))
.and(path(format!("/_matrix/client/r0/rooms/{room_id}/event/{event_id}")))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(event_json))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"chunk": [
{
"content": {
"avatar_url": null,
"displayname": "John Mastodon",
"membership": "join"
},
"room_id": room_id.clone(),
"event_id": "$151800140517rfvjc:example.org",
"membership": "join",
"origin_server_ts": 151800140,
"sender": sender.clone(),
"state_key": sender,
"type": "m.room.member",
"unsigned": {
"age": 2970366,
}
}
]
})))
.mount(&server)
.await;
mock_encryption_state(&server, false).await;
}
let item = notification_client.get_notification(room_id, event_id).await.unwrap();
server.reset().await;
let item = item.expect("the notification should be found");
assert_eq!(item.event.event_type(), TimelineEventType::RoomMessage);
assert_eq!(item.sender_display_name.as_deref(), Some("John Mastodon"));
}