From 2829d88f1c08f87d8d5fbed81fae064d23595342 Mon Sep 17 00:00:00 2001 From: Timo Date: Mon, 7 Apr 2025 18:58:41 +0200 Subject: [PATCH] WidgetDriver: add matrix driver toDevice support (reading and sending events via cs api) This also hooks up the widget via the machine actions. And adds toDevice events to the subscription. --- crates/matrix-sdk/src/widget/matrix.rs | 258 +++++++++++++++++++++++-- crates/matrix-sdk/src/widget/mod.rs | 23 ++- 2 files changed, 256 insertions(+), 25 deletions(-) diff --git a/crates/matrix-sdk/src/widget/matrix.rs b/crates/matrix-sdk/src/widget/matrix.rs index 0f6ccec06..15de1da8c 100644 --- a/crates/matrix-sdk/src/widget/matrix.rs +++ b/crates/matrix-sdk/src/widget/matrix.rs @@ -17,28 +17,36 @@ use std::collections::BTreeMap; -use matrix_sdk_base::deserialized_responses::RawAnySyncOrStrippedState; +use futures_util::future::join_all; +use matrix_sdk_base::deserialized_responses::{EncryptionInfo, RawAnySyncOrStrippedState}; +use matrix_sdk_common::executor::spawn; use ruma::{ api::client::{ account::request_openid_token::v3::{Request as OpenIdRequest, Response as OpenIdResponse}, delayed_events::{self, update_delayed_event::unstable::UpdateAction}, filter::RoomEventFilter, + to_device::send_event_to_device::{self, v3::Request as RumaToDeviceRequest}, }, assign, events::{ AnyMessageLikeEventContent, AnyStateEventContent, AnySyncMessageLikeEvent, - AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, MessageLikeEventType, - StateEventType, TimelineEventType, + AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, AnyToDeviceEvent, + AnyToDeviceEventContent, MessageLikeEventType, StateEventType, TimelineEventType, + ToDeviceEventType, }, serde::{from_raw_json_value, Raw}, - EventId, RoomId, TransactionId, + to_device::DeviceIdOrAllDevices, + EventId, OwnedUserId, RoomId, TransactionId, UserId, }; use serde_json::{value::RawValue as RawJsonValue, Value}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tracing::error; +use tracing::{error, info, warn}; use super::{machine::SendEventResponse, StateKeySelector}; -use crate::{event_handler::EventHandlerDropGuard, room::MessagesOptions, Error, Result, Room}; +use crate::{ + encryption::identities::Device, event_handler::EventHandlerDropGuard, room::MessagesOptions, + Client, Error, Result, Room, +}; /// Thin wrapper around a [`Room`] that provides functionality relevant for /// widgets. @@ -86,7 +94,11 @@ impl MatrixDriver { ) -> Result>> { let room_id = self.room.room_id(); let convert = |sync_or_stripped_state| match sync_or_stripped_state { - RawAnySyncOrStrippedState::Sync(ev) => Some(attach_room_id(ev.cast_ref(), room_id)), + RawAnySyncOrStrippedState::Sync(ev) => with_attached_room_id(ev.cast_ref(), room_id) + .map_err(|e| { + error!("failed to convert event from `get_state_event` response:{}", e) + }) + .ok(), RawAnySyncOrStrippedState::Stripped(_) => { error!("MatrixDriver can't operate in invited rooms"); None @@ -181,7 +193,7 @@ impl MatrixDriver { /// Starts forwarding new room events. Once the returned `EventReceiver` /// is dropped, forwarding will be stopped. - pub(crate) fn events(&self) -> EventReceiver { + pub(crate) fn events(&self) -> EventReceiver> { let (tx, rx) = unbounded_channel(); let room_id = self.room.room_id().to_owned(); @@ -190,14 +202,29 @@ impl MatrixDriver { let _room_id = room_id.clone(); let handle_msg_like = self.room.add_event_handler(move |raw: Raw| { - let _ = _tx.send(attach_room_id(raw.cast_ref(), &_room_id)); + match with_attached_room_id(raw.cast_ref(), &_room_id) { + Ok(event_with_room_id) => { + let _ = _tx.send(event_with_room_id); + } + Err(e) => { + error!("Failed to attach room id to message like event: {}", e); + } + } async {} }); let drop_guard_msg_like = self.room.client().event_handler_drop_guard(handle_msg_like); - + let _room_id = room_id; + let _tx = tx; // Get only all state events from the state section of the sync. let handle_state = self.room.add_event_handler(move |raw: Raw| { - let _ = tx.send(attach_room_id(raw.cast_ref(), &room_id)); + match with_attached_room_id(raw.cast_ref(), &_room_id) { + Ok(event_with_room_id) => { + let _ = _tx.send(event_with_room_id); + } + Err(e) => { + error!("Failed to attach room id to state event: {}", e); + } + } async {} }); let drop_guard_state = self.room.client().event_handler_drop_guard(handle_state); @@ -208,25 +235,214 @@ impl MatrixDriver { // section of the sync will not be forwarded to the widget. // TODO annotate the events and send both timeline and state section state // events. - EventReceiver { rx, _drop_guards: [drop_guard_msg_like, drop_guard_state] } + EventReceiver { rx, _drop_guards: vec![drop_guard_msg_like, drop_guard_state] } + } + + /// Starts forwarding new room events. Once the returned `EventReceiver` + /// is dropped, forwarding will be stopped. + pub(crate) fn to_device_events(&self) -> EventReceiver> { + let (tx, rx) = unbounded_channel(); + + let to_device_handle = self.room.client().add_event_handler( + move |raw: Raw, encryption_info: Option| { + match with_attached_encryption_flag(raw, &encryption_info) { + Ok(ev) => { + let _ = tx.send(ev); + } + Err(e) => { + error!("Failed to attach encryption flag to to_device event: {}", e); + } + } + async {} + }, + ); + + let drop_guard = self.room.client().event_handler_drop_guard(to_device_handle); + EventReceiver { rx, _drop_guards: vec![drop_guard] } + } + + /// It will ignore all devices where errors occurred or where the device is + /// not verified or where th user has a has_verification_violation. + pub(crate) async fn send_to_device( + &self, + event_type: ToDeviceEventType, + encrypted: bool, + messages: BTreeMap< + OwnedUserId, + BTreeMap>, + >, + ) -> Result { + let client = self.room.client(); + /// This encrypts to device content for a collection of devices. + /// It will ignore all devices where errors occurred or where the device + /// is not verified or where th user has a has_verification_violation. + async fn encrypted_content_for_devices( + unencrypted_content: &Raw, + devices: Vec, + event_type: &ToDeviceEventType, + ) -> Result)>> + { + let content: Value = + unencrypted_content.deserialize_as().map_err(Into::::into)?; + let event_type = event_type.clone(); + let device_content_tasks = devices.into_iter().map(|device| spawn({ + let event_type = event_type.clone(); + let content = content.clone(); + + async move { + if !device.is_cross_signed_by_owner() { + info!("Device {} is not verified, skipping encryption", device.device_id()); + return None; + } + match device + .inner + .encrypt_event_raw(&event_type.to_string(), &content) + .await { + Ok(encrypted) => Some((device.device_id().to_owned().into(), encrypted.cast())), + Err(e) =>{ info!("Failed to encrypt to_device event from widget for device: {} because, {}", device.device_id(), e); None}, + } + } + })); + let device_encrypted_content_map = + join_all(device_content_tasks).await.into_iter().flatten().flatten(); + Ok(device_encrypted_content_map) + } + + /// Convert the device content map for one user into the same content + /// map with encrypted content This needs to flatten the vectors + /// we get from `encrypted_content_for_devices` + /// since one `DeviceIdOrAllDevices` id can be multiple devices. + async fn encrypted_device_content_map( + client: &Client, + user_id: &UserId, + event_type: &ToDeviceEventType, + device_content_map: BTreeMap>, + ) -> BTreeMap> { + let device_map_futures = + device_content_map.into_iter().map(|(device_or_all_id, content)| spawn({ + let client = client.clone(); + let user_id = user_id.to_owned(); + let event_type = event_type.clone(); + async move { + let Ok(user_devices) = client.encryption().get_user_devices(&user_id).await else { + warn!("Failed to get user devices for user: {}", user_id); + return None; + }; + let Ok(user_identity) = client.encryption().get_user_identity(&user_id).await else{ + warn!("Failed to get user identity for user: {}", user_id); + return None; + }; + if user_identity.map(|i|i.has_verification_violation()).unwrap_or(false) { + info!("User {} has a verification violation, skipping encryption", user_id); + return None; + } + let devices: Vec = match device_or_all_id { + DeviceIdOrAllDevices::DeviceId(device_id) => { + vec![user_devices.get(&device_id)].into_iter().flatten().collect() + } + DeviceIdOrAllDevices::AllDevices => user_devices.devices().collect(), + }; + encrypted_content_for_devices( + &content, + devices, + &event_type, + ) + .await + .map_err(|e| info!("WidgetDriver: could not encrypt content for to device widget event content: {}. because, {}", content.json(), e)) + .ok() + }})); + let content_map_iterator = join_all(device_map_futures).await.into_iter(); + + // The first flatten takes the iterator over Result)>>, JoinError>> + // and flattens the Result (drops Err() items) + // The second takes the iterator over: Option)>> + // and flattens the Option (drops None items) + // The third takes the iterator over iterators: impl Iterator)> + // and flattens it to just an iterator over (DeviceIdOrAllDevices, + // Raw) + content_map_iterator.flatten().flatten().flatten().collect() + } + + let request = if encrypted { + // We first want to get all missing session before we start any to device + // sending! + client.claim_one_time_keys(messages.keys().map(|u| u.as_ref())).await?; + let encrypted_content: BTreeMap< + OwnedUserId, + BTreeMap>, + > = join_all(messages.into_iter().map(|(user_id, device_content_map)| { + let event_type = event_type.clone(); + async move { + ( + user_id.clone(), + encrypted_device_content_map( + &self.room.client(), + &user_id, + &event_type, + device_content_map, + ) + .await, + ) + } + })) + .await + .into_iter() + .collect(); + + RumaToDeviceRequest::new_raw( + ToDeviceEventType::RoomEncrypted, + TransactionId::new(), + encrypted_content, + ) + } else { + RumaToDeviceRequest::new_raw(event_type, TransactionId::new(), messages) + }; + + let response = client.send(request).await; + + response.map_err(Into::into) } } /// A simple entity that wraps an `UnboundedReceiver` /// along with the drop guard for the room event handler. -pub(crate) struct EventReceiver { - rx: UnboundedReceiver>, - _drop_guards: [EventHandlerDropGuard; 2], +pub(crate) struct EventReceiver { + rx: UnboundedReceiver, + _drop_guards: Vec, } -impl EventReceiver { - pub(crate) async fn recv(&mut self) -> Option> { +impl EventReceiver { + pub(crate) async fn recv(&mut self) -> Option { self.rx.recv().await } } -fn attach_room_id(raw_ev: &Raw, room_id: &RoomId) -> Raw { - let mut ev_obj = raw_ev.deserialize_as::>>().unwrap(); - ev_obj.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id).unwrap()); - Raw::new(&ev_obj).unwrap().cast() +fn with_attached_room_id( + raw: &Raw, + room_id: &RoomId, +) -> Result> { + match raw.deserialize_as::>>() { + Ok(mut ev_mut) => { + ev_mut.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id)?); + Ok(Raw::new(&ev_mut)?.cast()) + } + Err(e) => Err(Error::from(e)), + } +} + +fn with_attached_encryption_flag( + raw: Raw, + encryption_info: &Option, +) -> Result> { + match raw.deserialize_as::>>() { + Ok(mut ev_mut) => { + let encrypted = encryption_info.is_some(); + ev_mut.insert("encrypted".to_owned(), serde_json::value::to_raw_value(&encrypted)?); + Ok(Raw::new(&ev_mut)?.cast()) + } + Err(e) => Err(Error::from(e)), + } } diff --git a/crates/matrix-sdk/src/widget/mod.rs b/crates/matrix-sdk/src/widget/mod.rs index 56f823ebd..e9c8703d0 100644 --- a/crates/matrix-sdk/src/widget/mod.rs +++ b/crates/matrix-sdk/src/widget/mod.rs @@ -40,7 +40,7 @@ mod settings; pub use self::{ capabilities::{Capabilities, CapabilitiesProvider}, - filter::{EventFilter, MessageLikeEventFilter, StateEventFilter}, + filter::{Filter, MessageLikeEventFilter, StateEventFilter, ToDeviceEventFilter}, settings::{ ClientProperties, EncryptionSystem, Intent, VirtualElementCallWidgetOptions, WidgetSettings, }, @@ -237,7 +237,16 @@ impl WidgetDriver { .await .map(MatrixDriverResponse::MatrixDelayedEventUpdate), - MatrixDriverRequestData::SendToDeviceEvent(req) => todo!(), + MatrixDriverRequestData::SendToDeviceEvent(send_to_device_request) => { + matrix_driver + .send_to_device( + send_to_device_request.event_type, + send_to_device_request.encrypted, + send_to_device_request.messages, + ) + .await + .map(MatrixDriverResponse::MatrixToDeviceSent) + } }; // Forward the matrix driver response to the incoming message stream. @@ -259,7 +268,8 @@ impl WidgetDriver { self.event_forwarding_guard = Some(guard); - let mut matrix = matrix_driver.events(); + let mut timeline_receiver = matrix_driver.events(); + let mut to_device_receiver = matrix_driver.to_device_events(); let incoming_msg_tx = incoming_msg_tx.clone(); spawn(async move { @@ -270,10 +280,15 @@ impl WidgetDriver { return; } - Some(event) = matrix.recv() => { + Some(event) = timeline_receiver.recv() => { // Forward all events to the incoming messages stream. let _ = incoming_msg_tx.send(IncomingMessage::MatrixEventReceived(event)); } + + Some(event) = to_device_receiver.recv() => { + // Forward all events to the incoming messages stream. + let _ = incoming_msg_tx.send(IncomingMessage::ToDeviceReceived(event)); + } } } });