mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-06-18 21:28:33 -04:00
WidgetDriver: add matrix driver toDevice support (reading and sending events via cs api)
This also hooks up the widget via the machine actions. And adds toDevice events to the subscription.
This commit is contained in:
@@ -17,28 +17,36 @@
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use matrix_sdk_base::deserialized_responses::RawAnySyncOrStrippedState;
|
||||
use futures_util::future::join_all;
|
||||
use matrix_sdk_base::deserialized_responses::{EncryptionInfo, RawAnySyncOrStrippedState};
|
||||
use matrix_sdk_common::executor::spawn;
|
||||
use ruma::{
|
||||
api::client::{
|
||||
account::request_openid_token::v3::{Request as OpenIdRequest, Response as OpenIdResponse},
|
||||
delayed_events::{self, update_delayed_event::unstable::UpdateAction},
|
||||
filter::RoomEventFilter,
|
||||
to_device::send_event_to_device::{self, v3::Request as RumaToDeviceRequest},
|
||||
},
|
||||
assign,
|
||||
events::{
|
||||
AnyMessageLikeEventContent, AnyStateEventContent, AnySyncMessageLikeEvent,
|
||||
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, MessageLikeEventType,
|
||||
StateEventType, TimelineEventType,
|
||||
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, AnyToDeviceEvent,
|
||||
AnyToDeviceEventContent, MessageLikeEventType, StateEventType, TimelineEventType,
|
||||
ToDeviceEventType,
|
||||
},
|
||||
serde::{from_raw_json_value, Raw},
|
||||
EventId, RoomId, TransactionId,
|
||||
to_device::DeviceIdOrAllDevices,
|
||||
EventId, OwnedUserId, RoomId, TransactionId, UserId,
|
||||
};
|
||||
use serde_json::{value::RawValue as RawJsonValue, Value};
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
|
||||
use tracing::error;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use super::{machine::SendEventResponse, StateKeySelector};
|
||||
use crate::{event_handler::EventHandlerDropGuard, room::MessagesOptions, Error, Result, Room};
|
||||
use crate::{
|
||||
encryption::identities::Device, event_handler::EventHandlerDropGuard, room::MessagesOptions,
|
||||
Client, Error, Result, Room,
|
||||
};
|
||||
|
||||
/// Thin wrapper around a [`Room`] that provides functionality relevant for
|
||||
/// widgets.
|
||||
@@ -86,7 +94,11 @@ impl MatrixDriver {
|
||||
) -> Result<Vec<Raw<AnyTimelineEvent>>> {
|
||||
let room_id = self.room.room_id();
|
||||
let convert = |sync_or_stripped_state| match sync_or_stripped_state {
|
||||
RawAnySyncOrStrippedState::Sync(ev) => Some(attach_room_id(ev.cast_ref(), room_id)),
|
||||
RawAnySyncOrStrippedState::Sync(ev) => with_attached_room_id(ev.cast_ref(), room_id)
|
||||
.map_err(|e| {
|
||||
error!("failed to convert event from `get_state_event` response:{}", e)
|
||||
})
|
||||
.ok(),
|
||||
RawAnySyncOrStrippedState::Stripped(_) => {
|
||||
error!("MatrixDriver can't operate in invited rooms");
|
||||
None
|
||||
@@ -181,7 +193,7 @@ impl MatrixDriver {
|
||||
|
||||
/// Starts forwarding new room events. Once the returned `EventReceiver`
|
||||
/// is dropped, forwarding will be stopped.
|
||||
pub(crate) fn events(&self) -> EventReceiver {
|
||||
pub(crate) fn events(&self) -> EventReceiver<Raw<AnyTimelineEvent>> {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
let room_id = self.room.room_id().to_owned();
|
||||
|
||||
@@ -190,14 +202,29 @@ impl MatrixDriver {
|
||||
let _room_id = room_id.clone();
|
||||
let handle_msg_like =
|
||||
self.room.add_event_handler(move |raw: Raw<AnySyncMessageLikeEvent>| {
|
||||
let _ = _tx.send(attach_room_id(raw.cast_ref(), &_room_id));
|
||||
match with_attached_room_id(raw.cast_ref(), &_room_id) {
|
||||
Ok(event_with_room_id) => {
|
||||
let _ = _tx.send(event_with_room_id);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to attach room id to message like event: {}", e);
|
||||
}
|
||||
}
|
||||
async {}
|
||||
});
|
||||
let drop_guard_msg_like = self.room.client().event_handler_drop_guard(handle_msg_like);
|
||||
|
||||
let _room_id = room_id;
|
||||
let _tx = tx;
|
||||
// Get only all state events from the state section of the sync.
|
||||
let handle_state = self.room.add_event_handler(move |raw: Raw<AnySyncStateEvent>| {
|
||||
let _ = tx.send(attach_room_id(raw.cast_ref(), &room_id));
|
||||
match with_attached_room_id(raw.cast_ref(), &_room_id) {
|
||||
Ok(event_with_room_id) => {
|
||||
let _ = _tx.send(event_with_room_id);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to attach room id to state event: {}", e);
|
||||
}
|
||||
}
|
||||
async {}
|
||||
});
|
||||
let drop_guard_state = self.room.client().event_handler_drop_guard(handle_state);
|
||||
@@ -208,25 +235,214 @@ impl MatrixDriver {
|
||||
// section of the sync will not be forwarded to the widget.
|
||||
// TODO annotate the events and send both timeline and state section state
|
||||
// events.
|
||||
EventReceiver { rx, _drop_guards: [drop_guard_msg_like, drop_guard_state] }
|
||||
EventReceiver { rx, _drop_guards: vec![drop_guard_msg_like, drop_guard_state] }
|
||||
}
|
||||
|
||||
/// Starts forwarding new room events. Once the returned `EventReceiver`
|
||||
/// is dropped, forwarding will be stopped.
|
||||
pub(crate) fn to_device_events(&self) -> EventReceiver<Raw<AnyToDeviceEvent>> {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
|
||||
let to_device_handle = self.room.client().add_event_handler(
|
||||
move |raw: Raw<AnyToDeviceEvent>, encryption_info: Option<EncryptionInfo>| {
|
||||
match with_attached_encryption_flag(raw, &encryption_info) {
|
||||
Ok(ev) => {
|
||||
let _ = tx.send(ev);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to attach encryption flag to to_device event: {}", e);
|
||||
}
|
||||
}
|
||||
async {}
|
||||
},
|
||||
);
|
||||
|
||||
let drop_guard = self.room.client().event_handler_drop_guard(to_device_handle);
|
||||
EventReceiver { rx, _drop_guards: vec![drop_guard] }
|
||||
}
|
||||
|
||||
/// It will ignore all devices where errors occurred or where the device is
|
||||
/// not verified or where th user has a has_verification_violation.
|
||||
pub(crate) async fn send_to_device(
|
||||
&self,
|
||||
event_type: ToDeviceEventType,
|
||||
encrypted: bool,
|
||||
messages: BTreeMap<
|
||||
OwnedUserId,
|
||||
BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
|
||||
>,
|
||||
) -> Result<send_event_to_device::v3::Response> {
|
||||
let client = self.room.client();
|
||||
/// This encrypts to device content for a collection of devices.
|
||||
/// It will ignore all devices where errors occurred or where the device
|
||||
/// is not verified or where th user has a has_verification_violation.
|
||||
async fn encrypted_content_for_devices(
|
||||
unencrypted_content: &Raw<AnyToDeviceEventContent>,
|
||||
devices: Vec<Device>,
|
||||
event_type: &ToDeviceEventType,
|
||||
) -> Result<impl Iterator<Item = (DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>)>>
|
||||
{
|
||||
let content: Value =
|
||||
unencrypted_content.deserialize_as().map_err(Into::<Error>::into)?;
|
||||
let event_type = event_type.clone();
|
||||
let device_content_tasks = devices.into_iter().map(|device| spawn({
|
||||
let event_type = event_type.clone();
|
||||
let content = content.clone();
|
||||
|
||||
async move {
|
||||
if !device.is_cross_signed_by_owner() {
|
||||
info!("Device {} is not verified, skipping encryption", device.device_id());
|
||||
return None;
|
||||
}
|
||||
match device
|
||||
.inner
|
||||
.encrypt_event_raw(&event_type.to_string(), &content)
|
||||
.await {
|
||||
Ok(encrypted) => Some((device.device_id().to_owned().into(), encrypted.cast())),
|
||||
Err(e) =>{ info!("Failed to encrypt to_device event from widget for device: {} because, {}", device.device_id(), e); None},
|
||||
}
|
||||
}
|
||||
}));
|
||||
let device_encrypted_content_map =
|
||||
join_all(device_content_tasks).await.into_iter().flatten().flatten();
|
||||
Ok(device_encrypted_content_map)
|
||||
}
|
||||
|
||||
/// Convert the device content map for one user into the same content
|
||||
/// map with encrypted content This needs to flatten the vectors
|
||||
/// we get from `encrypted_content_for_devices`
|
||||
/// since one `DeviceIdOrAllDevices` id can be multiple devices.
|
||||
async fn encrypted_device_content_map(
|
||||
client: &Client,
|
||||
user_id: &UserId,
|
||||
event_type: &ToDeviceEventType,
|
||||
device_content_map: BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
|
||||
) -> BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>> {
|
||||
let device_map_futures =
|
||||
device_content_map.into_iter().map(|(device_or_all_id, content)| spawn({
|
||||
let client = client.clone();
|
||||
let user_id = user_id.to_owned();
|
||||
let event_type = event_type.clone();
|
||||
async move {
|
||||
let Ok(user_devices) = client.encryption().get_user_devices(&user_id).await else {
|
||||
warn!("Failed to get user devices for user: {}", user_id);
|
||||
return None;
|
||||
};
|
||||
let Ok(user_identity) = client.encryption().get_user_identity(&user_id).await else{
|
||||
warn!("Failed to get user identity for user: {}", user_id);
|
||||
return None;
|
||||
};
|
||||
if user_identity.map(|i|i.has_verification_violation()).unwrap_or(false) {
|
||||
info!("User {} has a verification violation, skipping encryption", user_id);
|
||||
return None;
|
||||
}
|
||||
let devices: Vec<Device> = match device_or_all_id {
|
||||
DeviceIdOrAllDevices::DeviceId(device_id) => {
|
||||
vec![user_devices.get(&device_id)].into_iter().flatten().collect()
|
||||
}
|
||||
DeviceIdOrAllDevices::AllDevices => user_devices.devices().collect(),
|
||||
};
|
||||
encrypted_content_for_devices(
|
||||
&content,
|
||||
devices,
|
||||
&event_type,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| info!("WidgetDriver: could not encrypt content for to device widget event content: {}. because, {}", content.json(), e))
|
||||
.ok()
|
||||
}}));
|
||||
let content_map_iterator = join_all(device_map_futures).await.into_iter();
|
||||
|
||||
// The first flatten takes the iterator over Result<Option<impl Iterator<Item =
|
||||
// (DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>)>>, JoinError>>
|
||||
// and flattens the Result (drops Err() items)
|
||||
// The second takes the iterator over: Option<impl Iterator<Item =
|
||||
// (DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>)>>
|
||||
// and flattens the Option (drops None items)
|
||||
// The third takes the iterator over iterators: impl Iterator<Item =
|
||||
// (DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>)>
|
||||
// and flattens it to just an iterator over (DeviceIdOrAllDevices,
|
||||
// Raw<AnyToDeviceEventContent>)
|
||||
content_map_iterator.flatten().flatten().flatten().collect()
|
||||
}
|
||||
|
||||
let request = if encrypted {
|
||||
// We first want to get all missing session before we start any to device
|
||||
// sending!
|
||||
client.claim_one_time_keys(messages.keys().map(|u| u.as_ref())).await?;
|
||||
let encrypted_content: BTreeMap<
|
||||
OwnedUserId,
|
||||
BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
|
||||
> = join_all(messages.into_iter().map(|(user_id, device_content_map)| {
|
||||
let event_type = event_type.clone();
|
||||
async move {
|
||||
(
|
||||
user_id.clone(),
|
||||
encrypted_device_content_map(
|
||||
&self.room.client(),
|
||||
&user_id,
|
||||
&event_type,
|
||||
device_content_map,
|
||||
)
|
||||
.await,
|
||||
)
|
||||
}
|
||||
}))
|
||||
.await
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
RumaToDeviceRequest::new_raw(
|
||||
ToDeviceEventType::RoomEncrypted,
|
||||
TransactionId::new(),
|
||||
encrypted_content,
|
||||
)
|
||||
} else {
|
||||
RumaToDeviceRequest::new_raw(event_type, TransactionId::new(), messages)
|
||||
};
|
||||
|
||||
let response = client.send(request).await;
|
||||
|
||||
response.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple entity that wraps an `UnboundedReceiver`
|
||||
/// along with the drop guard for the room event handler.
|
||||
pub(crate) struct EventReceiver {
|
||||
rx: UnboundedReceiver<Raw<AnyTimelineEvent>>,
|
||||
_drop_guards: [EventHandlerDropGuard; 2],
|
||||
pub(crate) struct EventReceiver<E> {
|
||||
rx: UnboundedReceiver<E>,
|
||||
_drop_guards: Vec<EventHandlerDropGuard>,
|
||||
}
|
||||
|
||||
impl EventReceiver {
|
||||
pub(crate) async fn recv(&mut self) -> Option<Raw<AnyTimelineEvent>> {
|
||||
impl<T> EventReceiver<T> {
|
||||
pub(crate) async fn recv(&mut self) -> Option<T> {
|
||||
self.rx.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
fn attach_room_id(raw_ev: &Raw<AnySyncTimelineEvent>, room_id: &RoomId) -> Raw<AnyTimelineEvent> {
|
||||
let mut ev_obj = raw_ev.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>().unwrap();
|
||||
ev_obj.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id).unwrap());
|
||||
Raw::new(&ev_obj).unwrap().cast()
|
||||
fn with_attached_room_id(
|
||||
raw: &Raw<AnySyncTimelineEvent>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<Raw<AnyTimelineEvent>> {
|
||||
match raw.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>() {
|
||||
Ok(mut ev_mut) => {
|
||||
ev_mut.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id)?);
|
||||
Ok(Raw::new(&ev_mut)?.cast())
|
||||
}
|
||||
Err(e) => Err(Error::from(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn with_attached_encryption_flag(
|
||||
raw: Raw<AnyToDeviceEvent>,
|
||||
encryption_info: &Option<EncryptionInfo>,
|
||||
) -> Result<Raw<AnyToDeviceEvent>> {
|
||||
match raw.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>() {
|
||||
Ok(mut ev_mut) => {
|
||||
let encrypted = encryption_info.is_some();
|
||||
ev_mut.insert("encrypted".to_owned(), serde_json::value::to_raw_value(&encrypted)?);
|
||||
Ok(Raw::new(&ev_mut)?.cast())
|
||||
}
|
||||
Err(e) => Err(Error::from(e)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ mod settings;
|
||||
|
||||
pub use self::{
|
||||
capabilities::{Capabilities, CapabilitiesProvider},
|
||||
filter::{EventFilter, MessageLikeEventFilter, StateEventFilter},
|
||||
filter::{Filter, MessageLikeEventFilter, StateEventFilter, ToDeviceEventFilter},
|
||||
settings::{
|
||||
ClientProperties, EncryptionSystem, Intent, VirtualElementCallWidgetOptions, WidgetSettings,
|
||||
},
|
||||
@@ -237,7 +237,16 @@ impl WidgetDriver {
|
||||
.await
|
||||
.map(MatrixDriverResponse::MatrixDelayedEventUpdate),
|
||||
|
||||
MatrixDriverRequestData::SendToDeviceEvent(req) => todo!(),
|
||||
MatrixDriverRequestData::SendToDeviceEvent(send_to_device_request) => {
|
||||
matrix_driver
|
||||
.send_to_device(
|
||||
send_to_device_request.event_type,
|
||||
send_to_device_request.encrypted,
|
||||
send_to_device_request.messages,
|
||||
)
|
||||
.await
|
||||
.map(MatrixDriverResponse::MatrixToDeviceSent)
|
||||
}
|
||||
};
|
||||
|
||||
// Forward the matrix driver response to the incoming message stream.
|
||||
@@ -259,7 +268,8 @@ impl WidgetDriver {
|
||||
|
||||
self.event_forwarding_guard = Some(guard);
|
||||
|
||||
let mut matrix = matrix_driver.events();
|
||||
let mut timeline_receiver = matrix_driver.events();
|
||||
let mut to_device_receiver = matrix_driver.to_device_events();
|
||||
let incoming_msg_tx = incoming_msg_tx.clone();
|
||||
|
||||
spawn(async move {
|
||||
@@ -270,10 +280,15 @@ impl WidgetDriver {
|
||||
return;
|
||||
}
|
||||
|
||||
Some(event) = matrix.recv() => {
|
||||
Some(event) = timeline_receiver.recv() => {
|
||||
// Forward all events to the incoming messages stream.
|
||||
let _ = incoming_msg_tx.send(IncomingMessage::MatrixEventReceived(event));
|
||||
}
|
||||
|
||||
Some(event) = to_device_receiver.recv() => {
|
||||
// Forward all events to the incoming messages stream.
|
||||
let _ = incoming_msg_tx.send(IncomingMessage::ToDeviceReceived(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user