feat(ui): allow retrieving push notification events in batches

This commit is contained in:
Jorge Martín
2025-05-09 09:47:05 +02:00
committed by Jorge Martin Espinosa
parent 1afad3ab78
commit 008c6f6d6c
2 changed files with 344 additions and 77 deletions

View File

@@ -1,9 +1,10 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use matrix_sdk_ui::notification_client::{
NotificationClient as MatrixNotificationClient, NotificationItem as MatrixNotificationItem,
};
use ruma::{EventId, RoomId};
use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId};
use tracing::error;
use crate::{
client::{Client, JoinRule},
@@ -130,4 +131,66 @@ impl NotificationClient {
Ok(None)
}
}
/// Get several notification items in a single batch.
///
/// Returns an error if the flow failed when preparing to fetch the
/// notifications, and a [`HashMap`] containing either a
/// [`NotificationItem`] or no entry for it if it failed to fetch a
/// notification for the provided [`EventId`].
pub async fn get_notifications(
&self,
requests: Vec<NotificationItemsRequest>,
) -> Result<HashMap<String, NotificationItem>, ClientError> {
let requests =
requests.into_iter().map(TryInto::try_into).collect::<Result<Vec<_>, _>>()?;
let items = self.inner.get_notifications(&requests).await?;
let mut result = HashMap::new();
for (key, value) in items.into_iter() {
match value {
Ok(item) => {
result.insert(key.to_string(), NotificationItem::from_inner(item));
}
Err(error) => {
// TODO This error should actually be returned so the clients can handle the
// error as they see fit, but it's failing when creating
// bindings for Go, i.e.
// (https://github.com/NordSecurity/uniffi-bindgen-go/issues/62)
error!("Could not fetch notification {key}, an error happened: {error}");
}
}
}
Ok(result)
}
}
/// A request for notification items grouped by their room.
#[derive(uniffi::Record)]
pub struct NotificationItemsRequest {
room_id: String,
event_ids: Vec<String>,
}
impl NotificationItemsRequest {
/// The parsed [`OwnedRoomId`] to use with the SDK crates.
pub fn room_id(&self) -> Result<OwnedRoomId, ClientError> {
RoomId::parse(&self.room_id).map_err(ClientError::from)
}
/// The parsed [`OwnedEventId`] list to use with the SDK crates.
pub fn event_ids(&self) -> Result<Vec<OwnedEventId>, ClientError> {
self.event_ids
.iter()
.map(|id| EventId::parse(id).map_err(ClientError::from))
.collect::<Result<Vec<_>, _>>()
}
}
impl TryFrom<NotificationItemsRequest>
for matrix_sdk_ui::notification_client::NotificationItemsRequest
{
type Error = ClientError;
fn try_from(value: NotificationItemsRequest) -> Result<Self, Self::Error> {
Ok(Self { room_id: value.room_id()?, event_ids: value.event_ids()? })
}
}

View File

@@ -13,6 +13,10 @@
// limitations under the License.
use std::{
collections::{
btree_map::{IntoIter, Iter},
BTreeMap,
},
sync::{Arc, Mutex},
time::Duration,
};
@@ -39,7 +43,7 @@ use ruma::{
html::RemoveReplyFallback,
push::Action,
serde::Raw,
uint, EventId, OwnedEventId, RoomId, UserId,
uint, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
};
use thiserror::Error;
use tokio::sync::Mutex as AsyncMutex;
@@ -157,6 +161,56 @@ impl NotificationClient {
}
}
/// Fetches the content of several notifications.
///
/// This will first try to get the notifications using a short-lived sliding
/// sync, and if the sliding-sync can't find the events, then it'll use a
/// `/context` query to find the events with associated member information.
///
/// An error result at the top level means that something failed when trying
/// to set up the notification fetching. For each notification item you can
/// also receive an error, which means something failed when trying to fetch
/// that particular notification (decryption, fetching push actions, etc.);
/// in that case, a dummy notification may be displayed instead. A
/// `None` result means the notification has been filtered out by the
/// user's push rules.
pub async fn get_notifications(
&self,
requests: &[NotificationItemsRequest],
) -> Result<BatchNotificationFetchingResult<NotificationItem>, Error> {
let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
let mut notification_items = BatchNotificationFetchingResult::new();
for request in requests {
for event_id in &request.event_ids {
match notifications.remove(event_id) {
Some(Ok(NotificationStatus::Event(item))) => {
notification_items.add_notification(event_id.to_owned(), item);
}
Some(Ok(NotificationStatus::EventNotFound)) | None => {
match self.get_notification_with_context(&request.room_id, event_id).await {
Ok(Some(item)) => {
notification_items.add_notification(event_id.to_owned(), item)
}
// Event filtered out, do nothing
Ok(None) => (),
Err(error) => notification_items
.mark_fetching_notification_failed(event_id.to_owned(), error),
}
}
// Event filtered out, do nothing
Some(Ok(NotificationStatus::EventFilteredOut)) => (),
Some(Err(e)) => {
notification_items
.mark_fetching_notification_failed(event_id.to_owned(), e);
}
}
}
}
Ok(notification_items)
}
/// Run an encryption sync loop, in case an event is still encrypted.
///
/// Will return true if and only:
@@ -300,10 +354,10 @@ impl NotificationClient {
}
}
/// Try to run a sliding sync (without encryption) to retrieve the event
/// Try to run a sliding sync (without encryption) to retrieve the events
/// from the notification.
///
/// The event can either be:
/// An event can either be:
/// - an invite event,
/// - or a non-invite event.
///
@@ -321,9 +375,8 @@ impl NotificationClient {
#[instrument(skip_all)]
async fn try_sliding_sync(
&self,
room_id: &RoomId,
event_id: &EventId,
) -> Result<Option<RawNotificationEvent>, Error> {
requests: &[NotificationItemsRequest],
) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
// Serialize all the calls to this method by taking a lock at the beginning,
// that will be dropped later.
let _guard = self.notification_sync_mutex.lock().await;
@@ -332,20 +385,34 @@ impl NotificationClient {
// notification, so we can figure out the full event and associated
// information.
let raw_notification = Arc::new(Mutex::new(None));
let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
let handler_raw_notification = raw_notification.clone();
let target_event_id = event_id.to_owned();
let handler_raw_notification = raw_notifications.clone();
let timeline_event_handler =
self.client.add_event_handler(move |raw: Raw<AnySyncTimelineEvent>| async move {
match raw.get_field::<OwnedEventId>("event_id") {
let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
let timeline_event_handler = self.client.add_event_handler({
let requests = requests.clone();
move |raw: Raw<AnySyncTimelineEvent>| async move {
match &raw.get_field::<OwnedEventId>("event_id") {
Ok(Some(event_id)) => {
if event_id == target_event_id {
// found it! There shouldn't be a previous event before, but if there
// is, that should be ok to just replace it.
*handler_raw_notification.lock().unwrap() =
Some(RawNotificationEvent::Timeline(raw));
let request =
&requests.iter().find(|request| request.event_ids.contains(event_id));
if request.is_none() {
return;
}
let room_id = request.unwrap().room_id.clone();
for request in requests.iter() {
if request.event_ids.contains(event_id) {
// found it! There shouldn't be a previous event before, but if
// there is, that should be ok to
// just replace it.
handler_raw_notification.lock().unwrap().insert(
event_id.to_owned(),
(room_id, Some(RawNotificationEvent::Timeline(raw))),
);
return;
}
}
}
Ok(None) => {
@@ -355,17 +422,18 @@ impl NotificationClient {
warn!("a sync event id couldn't be decoded: {err}");
}
}
});
}
});
// We'll only use this event if the room is in the invited state.
let raw_invite = Arc::new(Mutex::new(None));
let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
let target_event_id = event_id.to_owned();
let user_id = self.client.user_id().unwrap().to_owned();
let handler_raw_invite = raw_invite.clone();
let handler_raw_notification = raw_notification.clone();
let stripped_member_handler =
self.client.add_event_handler(move |raw: Raw<StrippedRoomMemberEvent>| async move {
let handler_raw_invites = raw_invites.clone();
let handler_raw_notifications = raw_notifications.clone();
let stripped_member_handler = self.client.add_event_handler({
let requests = requests.clone();
move |raw: Raw<StrippedRoomMemberEvent>| async move {
let deserialized = match raw.deserialize() {
Ok(d) => d,
Err(err) => {
@@ -378,15 +446,23 @@ impl NotificationClient {
// Try to match the event by event_id, as it's the most precise. In theory, we
// shouldn't receive it, so that's a first attempt.
match raw.get_field::<OwnedEventId>("event_id") {
match &raw.get_field::<OwnedEventId>("event_id") {
Ok(Some(event_id)) => {
if event_id == target_event_id {
// found it! There shouldn't be a previous event before, but if there
// is, that should be ok to just replace it.
*handler_raw_notification.lock().unwrap() =
Some(RawNotificationEvent::Invite(raw));
let request =
&requests.iter().find(|request| request.event_ids.contains(event_id));
if request.is_none() {
return;
}
let room_id = request.unwrap().room_id.clone();
// found it! There shouldn't be a previous event before, but if
// there is, that should be ok to
// just replace it.
handler_raw_notifications.lock().unwrap().insert(
event_id.to_owned(),
(room_id, Some(RawNotificationEvent::Invite(raw))),
);
return;
}
Ok(None) => {
debug!("a room member event had no id");
@@ -404,11 +480,15 @@ impl NotificationClient {
// This could be it! There might be several of these following each other, so
// assume it's the latest one (in sync ordering), and override a previous one if
// present.
*handler_raw_invite.lock().unwrap() = Some(RawNotificationEvent::Invite(raw));
handler_raw_invites
.lock()
.unwrap()
.insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
} else {
debug!("not an invite event, or not for the current user");
}
});
}
});
// Room power levels are necessary to build the push context.
let required_state = vec![
@@ -442,8 +522,9 @@ impl NotificationClient {
.build()
.await?;
let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
sync.subscribe_to_rooms(
&[room_id],
&room_ids,
Some(assign!(http::request::RoomSubscription::default(), {
required_state,
timeline_limit: uint!(16)
@@ -456,14 +537,19 @@ impl NotificationClient {
let stream = sync.sync();
pin_mut!(stream);
// Sum the expected event count for each room
let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
loop {
if stream.next().await.is_none() {
// Sliding sync aborted early.
break;
}
if raw_notification.lock().unwrap().is_some() || raw_invite.lock().unwrap().is_some() {
// We got the event.
if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
== expected_event_count
{
// We got the events.
break;
}
@@ -477,13 +563,29 @@ impl NotificationClient {
self.client.remove_event_handler(stripped_member_handler);
self.client.remove_event_handler(timeline_event_handler);
let mut maybe_event = raw_notification.lock().unwrap().take();
let mut notifications = raw_notifications.clone().lock().unwrap().clone();
let mut missing_event_ids = Vec::new();
if maybe_event.is_none() {
// Create the list of missing event ids after the syncs
for request in requests.iter() {
for event_id in &request.event_ids {
if !notifications.contains_key(event_id) {
missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
}
}
}
// Try checking if the missing notifications could be invites
for (room_id, missing_event_id) in missing_event_ids {
trace!("we didn't have a non-invite event, looking for invited room now");
if let Some(room) = self.client.get_room(room_id) {
if let Some(room) = self.client.get_room(&room_id) {
if room.state() == RoomState::Invited {
maybe_event = raw_invite.lock().unwrap().take();
if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
notifications.insert(
missing_event_id.to_owned(),
(room_id.to_owned(), stripped_event),
);
}
} else {
debug!("the room isn't in the invited state");
}
@@ -492,56 +594,117 @@ impl NotificationClient {
}
}
let found = if maybe_event.is_some() { "" } else { "not " };
trace!("the notification event has been {found}found");
let found = if notifications.len() == expected_event_count { "" } else { "not " };
trace!("all notification events have{found} been found");
Ok(maybe_event)
Ok(notifications)
}
/// Get a full notification, given a room id and event id.
///
/// This will run a small sliding sync to retrieve the content of the event,
/// along with extra data to form a rich notification context.
pub async fn get_notification_with_sliding_sync(
&self,
room_id: &RoomId,
event_id: &EventId,
) -> Result<NotificationStatus, Error> {
let Some(mut raw_event) = self.try_sliding_sync(room_id, event_id).await? else {
return Ok(NotificationStatus::EventNotFound);
};
let event_ids = vec![event_id.to_owned()];
let request = NotificationItemsRequest { room_id: room_id.to_owned(), event_ids };
let mut get_notifications_result =
self.get_notifications_with_sliding_sync(&[request]).await?;
get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
}
// At this point it should have been added by the sync, if it's not, give up.
let Some(room) = self.client.get_room(room_id) else { return Err(Error::UnknownRoom) };
/// Get a list of full notifications, given a room id and event ids.
///
/// This will run a small sliding sync to retrieve the content of the
/// events, along with extra data to form a rich notification context.
pub async fn get_notifications_with_sliding_sync(
&self,
requests: &[NotificationItemsRequest],
) -> Result<BatchNotificationFetchingResult<NotificationStatus>, Error> {
let raw_events = self.try_sliding_sync(requests).await?;
let push_actions = match &raw_event {
RawNotificationEvent::Timeline(timeline_event) => {
// Timeline events may be encrypted, so make sure they get decrypted first.
if let Some(mut timeline_event) =
self.retry_decryption(&room, timeline_event).await?
{
let push_actions = timeline_event.push_actions.take();
raw_event = RawNotificationEvent::Timeline(timeline_event.into_raw());
push_actions
let mut result = BatchNotificationFetchingResult::new();
for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
// At this point it should have been added by the sync, if it's not, give up.
let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
if let Some(raw_event) = raw_event {
let (raw_event, push_actions) = match &raw_event {
RawNotificationEvent::Timeline(timeline_event) => {
// Timeline events may be encrypted, so make sure they get decrypted first.
match self.retry_decryption(&room, timeline_event).await {
Ok(Some(mut timeline_event)) => {
let push_actions = timeline_event.push_actions.take();
(
RawNotificationEvent::Timeline(timeline_event.into_raw()),
push_actions,
)
}
Ok(None) => {
match room.event_push_actions(timeline_event).await {
Ok(push_actions) => (raw_event.clone(), push_actions),
Err(error) => {
// Could not get push actions,
result.mark_fetching_notification_failed(
event_id,
error.into(),
);
continue;
}
}
}
Err(error) => {
result.mark_fetching_notification_failed(event_id, error);
continue;
}
}
}
RawNotificationEvent::Invite(invite_event) => {
// Invite events can't be encrypted, so they should be in clear text.
match room.event_push_actions(invite_event).await {
Ok(push_actions) => {
(RawNotificationEvent::Invite(invite_event.clone()), push_actions)
}
Err(error) => {
result.mark_fetching_notification_failed(event_id, error.into());
continue;
}
}
}
};
let should_notify = push_actions
.as_ref()
.map(|actions| actions.iter().any(|a| a.should_notify()))
.unwrap_or(false);
if !should_notify {
result.add_notification(event_id, NotificationStatus::EventFilteredOut);
} else {
room.event_push_actions(timeline_event).await?
}
}
RawNotificationEvent::Invite(invite_event) => {
// Invite events can't be encrypted, so they should be in clear text.
room.event_push_actions(invite_event).await?
}
};
let notification_status = NotificationItem::new(
&room,
raw_event,
push_actions.as_deref(),
Vec::new(),
)
.await
.map(NotificationStatus::Event);
if let Some(push_actions) = &push_actions {
if !push_actions.iter().any(|a| a.should_notify()) {
return Ok(NotificationStatus::EventFilteredOut);
match notification_status {
Ok(notification_item) => {
result.add_notification(event_id, notification_item);
}
Err(error) => {
result.mark_fetching_notification_failed(event_id, error);
}
}
}
} else {
result.add_notification(event_id, NotificationStatus::EventNotFound);
}
}
Ok(NotificationStatus::Event(
NotificationItem::new(&room, raw_event, push_actions.as_deref(), Vec::new()).await?,
))
Ok(result)
}
/// Retrieve a notification using a `/context` query.
@@ -613,11 +776,52 @@ pub enum NotificationStatus {
EventFilteredOut,
}
#[derive(Debug, Clone)]
pub struct NotificationItemsRequest {
pub room_id: OwnedRoomId,
pub event_ids: Vec<OwnedEventId>,
}
#[derive(Default)]
pub struct BatchNotificationFetchingResult<T> {
notifications: BTreeMap<OwnedEventId, Result<T, Error>>,
}
impl<T> BatchNotificationFetchingResult<T> {
pub fn new() -> Self {
Self { notifications: BTreeMap::new() }
}
fn add_notification(&mut self, event_id: OwnedEventId, notification: T) {
self.notifications.insert(event_id, Ok(notification));
}
fn mark_fetching_notification_failed(&mut self, event_id: OwnedEventId, error: Error) {
self.notifications.insert(event_id, Err(error));
}
pub fn remove(&mut self, id: &EventId) -> Option<Result<T, Error>> {
self.notifications.remove(id)
}
pub fn iter(&self) -> Iter<'_, OwnedEventId, Result<T, Error>> {
self.notifications.iter()
}
}
impl<T> IntoIterator for BatchNotificationFetchingResult<T> {
type Item = (OwnedEventId, Result<T, Error>);
type IntoIter = IntoIter<OwnedEventId, Result<T, Error>>;
fn into_iter(self) -> Self::IntoIter {
self.notifications.into_iter()
}
}
/// The Notification event as it was fetched from remote for the
/// given `event_id`, represented as Raw but decrypted, thus only
/// whether it is an invite or regular Timeline event has been
/// determined.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum RawNotificationEvent {
/// The raw event for a timeline event
Timeline(Raw<AnySyncTimelineEvent>),