From 8070e3c16579ea3be588786f01ccbb7d4a35804d Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 14 Nov 2024 10:54:07 +0100 Subject: [PATCH] refactor(widget): tidy up and start commenting the widget code --- crates/matrix-sdk/src/widget/filter.rs | 2 - .../src/widget/machine/from_widget.rs | 1 - crates/matrix-sdk/src/widget/machine/mod.rs | 135 +++++++++++------- crates/matrix-sdk/src/widget/mod.rs | 44 +++--- 4 files changed, 107 insertions(+), 75 deletions(-) diff --git a/crates/matrix-sdk/src/widget/filter.rs b/crates/matrix-sdk/src/widget/filter.rs index c3daf2ce0..cfdca6291 100644 --- a/crates/matrix-sdk/src/widget/filter.rs +++ b/crates/matrix-sdk/src/widget/filter.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(dead_code)] // temporary - use ruma::events::{MessageLikeEventType, StateEventType, TimelineEventType}; use serde::Deserialize; diff --git a/crates/matrix-sdk/src/widget/machine/from_widget.rs b/crates/matrix-sdk/src/widget/machine/from_widget.rs index 7dcb3c86a..e70d53cf7 100644 --- a/crates/matrix-sdk/src/widget/machine/from_widget.rs +++ b/crates/matrix-sdk/src/widget/machine/from_widget.rs @@ -124,7 +124,6 @@ pub(super) enum ReadEventRequest { event_type: StateEventType, state_key: StateKeySelector, }, - #[allow(dead_code)] ReadMessageLikeEvent { #[serde(rename = "type")] event_type: MessageLikeEventType, diff --git a/crates/matrix-sdk/src/widget/machine/mod.rs b/crates/matrix-sdk/src/widget/machine/mod.rs index eb0c52187..d3b600c2c 100644 --- a/crates/matrix-sdk/src/widget/machine/mod.rs +++ b/crates/matrix-sdk/src/widget/machine/mod.rs @@ -14,8 +14,6 @@ //! No I/O logic of the [`WidgetDriver`]. -#![warn(unreachable_pub)] - use std::{fmt, iter, time::Duration}; use driver_req::UpdateDelayedEventRequest; @@ -54,7 +52,6 @@ use super::{ filter::{MatrixEventContent, MatrixEventFilterInput}, Capabilities, StateKeySelector, }; -use crate::widget::EventFilter; mod driver_req; mod from_widget; @@ -72,7 +69,7 @@ pub(crate) use self::{ }; /// Action (a command) that client (driver) must perform. -#[derive(Clone, Debug)] +#[derive(Debug)] pub(crate) enum Action { /// Send a raw message to the widget. SendToWidget(String), @@ -94,23 +91,33 @@ pub(crate) enum Action { /// Subscribe to the events in the *current* room, i.e. a room which this /// widget is instantiated with. The client is aware of the room. - #[allow(dead_code)] Subscribe, /// Unsuscribe from the events in the *current* room. Symmetrical to /// `Subscribe`. - #[allow(dead_code)] Unsubscribe, } /// No I/O state machine. /// -/// Handles interactions with the widget as well as the `MatrixDriver`. +/// Handles interactions with the widget as well as the +/// [`crate::widget::MatrixDriver`]. pub(crate) struct WidgetMachine { + /// Unique identifier for the widget. + /// + /// Allows distinguishing different widgets. widget_id: String, + + /// The room to which this widget machine is attached. room_id: OwnedRoomId, + + /// Outstanding requests sent to the widget (mapped by uuid). pending_to_widget_requests: PendingRequests, + + /// Outstanding requests sent to the matrix driver (mapped by uuid). pending_matrix_driver_requests: PendingRequests, + + /// Current negotiation state for capabilities. capabilities: CapabilitiesState, } @@ -149,9 +156,11 @@ impl WidgetMachine { match event { IncomingMessage::WidgetMessage(raw) => self.process_widget_message(&raw), + IncomingMessage::MatrixDriverResponse { request_id, response } => { self.process_matrix_driver_response(request_id, response) } + IncomingMessage::MatrixEventReceived(event) => { let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else { error!("Received matrix event before capabilities negotiation"); @@ -172,10 +181,8 @@ impl WidgetMachine { fn process_widget_message(&mut self, raw: &str) -> Vec { let message = match serde_json::from_str::(raw) { Ok(msg) => msg, - Err(e) => { - // TODO: There is a special error handling required for the invalid - // messages. Refer to the `widget-api-poc` for implementation notes. - error!("Failed to parse incoming message: {e}"); + Err(error) => { + error!("couldn't deserialize incoming widget message: {error}"); return Vec::new(); } }; @@ -203,21 +210,22 @@ impl WidgetMachine { ) -> Vec { let request = match raw_request.deserialize() { Ok(r) => r, - Err(e) => return vec![self.send_from_widget_error_response(raw_request, e)], + Err(e) => return vec![Self::send_from_widget_error_response(raw_request, e)], }; match request { FromWidgetRequest::SupportedApiVersions {} => { let response = SupportedApiVersionsResponse::new(); - vec![self.send_from_widget_response(raw_request, response)] + vec![Self::send_from_widget_response(raw_request, response)] } FromWidgetRequest::ContentLoaded {} => { - let response = vec![self.send_from_widget_response(raw_request, JsonObject::new())]; - self.capabilities - .is_unset() - .then(|| [&response, self.negotiate_capabilities().as_slice()].concat()) - .unwrap_or(response) + let mut response = + vec![Self::send_from_widget_response(raw_request, JsonObject::new())]; + if self.capabilities.is_unset() { + response.append(&mut self.negotiate_capabilities()); + } + response } FromWidgetRequest::ReadEvent(req) => self @@ -245,17 +253,20 @@ impl WidgetMachine { action.map(|a| vec![a]).unwrap_or_default() }); - let response = self.send_from_widget_response(raw_request, OpenIdResponse::Pending); + let response = + Self::send_from_widget_response(raw_request, OpenIdResponse::Pending); iter::once(response).chain(request_action).collect() } + FromWidgetRequest::DelayedEventUpdate(req) => { let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else { let text = "Received send update delayed event request before capabilities were negotiated"; - return vec![self.send_from_widget_error_response(raw_request, text)]; + return vec![Self::send_from_widget_error_response(raw_request, text)]; }; + if !capabilities.update_delayed_event { - return vec![self.send_from_widget_error_response( + return vec![Self::send_from_widget_error_response( raw_request, format!( "Not allowed: missing the {} capability.", @@ -263,19 +274,22 @@ impl WidgetMachine { ), )]; } + let (request, request_action) = self.send_matrix_driver_request(UpdateDelayedEventRequest { action: req.action, delay_id: req.delay_id, }); - request.then(|res, machine| { - vec![machine.send_from_widget_result_response( + + request.then(|res, _machine| { + vec![Self::send_from_widget_result_response( raw_request, // This is mapped to another type because the update_delay_event::Response // does not impl Serialize res.map(Into::::into), )] }); + request_action.map(|a| vec![a]).unwrap_or_default() } } @@ -288,14 +302,14 @@ impl WidgetMachine { ) -> Option { let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else { let text = "Received read event request before capabilities were negotiated"; - return Some(self.send_from_widget_error_response(raw_request, text)); + return Some(Self::send_from_widget_error_response(raw_request, text)); }; match request { ReadEventRequest::ReadMessageLikeEvent { event_type, limit } => { - let filter_fn = |f: &EventFilter| f.matches_message_like_event_type(&event_type); - if !capabilities.read.iter().any(filter_fn) { - return Some(self.send_from_widget_error_response( + if !capabilities.read.iter().any(|f| f.matches_message_like_event_type(&event_type)) + { + return Some(Self::send_from_widget_error_response( raw_request, "Not allowed to read message like event", )); @@ -304,7 +318,9 @@ impl WidgetMachine { const DEFAULT_EVENT_LIMIT: u32 = 50; let limit = limit.unwrap_or(DEFAULT_EVENT_LIMIT); let request = ReadMessageLikeEventRequest { event_type, limit }; + let (request, action) = self.send_matrix_driver_request(request); + request.then(|result, machine| { let response = result.and_then(|mut events| { let CapabilitiesState::Negotiated(capabilities) = &machine.capabilities @@ -316,10 +332,13 @@ impl WidgetMachine { events.retain(|e| capabilities.raw_event_matches_read_filter(e)); Ok(ReadEventResponse { events }) }); - vec![machine.send_from_widget_result_response(raw_request, response)] + + vec![Self::send_from_widget_result_response(raw_request, response)] }); + action } + ReadEventRequest::ReadStateEvent { event_type, state_key } => { let allowed = match &state_key { StateKeySelector::Any => capabilities @@ -342,13 +361,13 @@ impl WidgetMachine { if allowed { let request = ReadStateEventRequest { event_type, state_key }; let (request, action) = self.send_matrix_driver_request(request); - request.then(|result, machine| { + request.then(|result, _machine| { let response = result.map(|events| ReadEventResponse { events }); - vec![machine.send_from_widget_result_response(raw_request, response)] + vec![Self::send_from_widget_result_response(raw_request, response)] }); action } else { - Some(self.send_from_widget_error_response( + Some(Self::send_from_widget_error_response( raw_request, "Not allowed to read state event", )) @@ -377,8 +396,9 @@ impl WidgetMachine { Default::default() }), }; + if !capabilities.send_delayed_event && request.delay.is_some() { - return Some(self.send_from_widget_error_response( + return Some(Self::send_from_widget_error_response( raw_request, format!( "Not allowed: missing the {} capability.", @@ -386,19 +406,23 @@ impl WidgetMachine { ), )); } + if !capabilities.send.iter().any(|filter| filter.matches(&filter_in)) { - return Some( - self.send_from_widget_error_response(raw_request, "Not allowed to send event"), - ); + return Some(Self::send_from_widget_error_response( + raw_request, + "Not allowed to send event", + )); } let (request, action) = self.send_matrix_driver_request(request); + request.then(|mut result, machine| { if let Ok(r) = result.as_mut() { r.set_room_id(machine.room_id.clone()); } - vec![machine.send_from_widget_result_response(raw_request, result)] + vec![Self::send_from_widget_result_response(raw_request, result)] }); + action } @@ -453,38 +477,39 @@ impl WidgetMachine { } } - #[instrument(skip_all, fields(request_id))] + #[instrument(skip_all)] fn send_from_widget_response( - &self, raw_request: Raw, response_data: impl Serialize, ) -> Action { - let mut object = raw_request - .deserialize_as::>>() - .expect("Failed to converted FromWidgetRequest to object representation"); - let response_data = serde_json::value::to_raw_value(&response_data) - .expect("Failed to serialize response data"); - object.insert("response".to_owned(), response_data); - let serialized = serde_json::to_string(&object).expect("Failed to serialize response"); + let f = || { + let mut object = raw_request.deserialize_as::>>()?; + let response_data = serde_json::value::to_raw_value(&response_data)?; + object.insert("response".to_owned(), response_data); + serde_json::to_string(&object) + }; + + // SAFETY: we expect the raw request to be a valid JSON map, to which we add a + // new field. + let serialized = f().expect("error when attaching response to incoming request"); + Action::SendToWidget(serialized) } fn send_from_widget_error_response( - &self, raw_request: Raw, error: impl fmt::Display, ) -> Action { - self.send_from_widget_response(raw_request, FromWidgetErrorResponse::new(error)) + Self::send_from_widget_response(raw_request, FromWidgetErrorResponse::new(error)) } fn send_from_widget_result_response( - &self, raw_request: Raw, result: Result, ) -> Action { match result { - Ok(res) => self.send_from_widget_response(raw_request, res), - Err(msg) => self.send_from_widget_error_response(raw_request, msg), + Ok(res) => Self::send_from_widget_response(raw_request, res), + Err(msg) => Self::send_from_widget_error_response(raw_request, msg), } } @@ -495,7 +520,7 @@ impl WidgetMachine { ) -> (ToWidgetRequestHandle<'_, T::ResponseData>, Option) { #[derive(Serialize)] #[serde(tag = "api", rename = "toWidget", rename_all = "camelCase")] - struct ToWidgetRequestSerHelper<'a, T> { + struct ToWidgetRequestSerdeHelper<'a, T> { widget_id: &'a str, request_id: Uuid, action: &'static str, @@ -503,7 +528,7 @@ impl WidgetMachine { } let request_id = Uuid::new_v4(); - let full_request = ToWidgetRequestSerHelper { + let full_request = ToWidgetRequestSerdeHelper { widget_id: &self.widget_id, request_id, action: T::ACTION, @@ -561,7 +586,7 @@ impl WidgetMachine { let update = NotifyCapabilitiesChanged { approved, requested }; let (_request, action) = machine.send_to_widget_request(update); - (subscribe_required).then(|| Action::Subscribe).into_iter().chain(action).collect() + subscribe_required.then(|| Action::Subscribe).into_iter().chain(action).collect() }); action.map(|a| vec![a]).unwrap_or_default() @@ -598,9 +623,13 @@ impl MatrixDriverRequestMeta { } } +/// Current negotiation state for capabilities. enum CapabilitiesState { + /// Capabilities have never been defined. Unset, + /// We're currently negotiating capabilities. Negotiating, + /// The capabilities have already been negotiated. Negotiated(Capabilities), } diff --git a/crates/matrix-sdk/src/widget/mod.rs b/crates/matrix-sdk/src/widget/mod.rs index d5f7109e4..3176c5f08 100644 --- a/crates/matrix-sdk/src/widget/mod.rs +++ b/crates/matrix-sdk/src/widget/mod.rs @@ -115,26 +115,25 @@ impl WidgetDriver { (driver, channels) } - /// Starts a client widget API state machine for a given `widget` in a given - /// joined `room`. The function returns once the widget is disconnected or - /// any terminal error occurs. + /// Run client widget API state machine in a given joined `room` forever. /// - /// Not implemented yet! Currently, it does not contain any useful - /// functionality, it only blindly forwards the messages and returns errors - /// once a non-implemented part is triggered. + /// The function returns once the widget is disconnected or any terminal + /// error occurs. pub async fn run( self, room: Room, capabilities_provider: impl CapabilitiesProvider, ) -> Result<(), ()> { - // Create a channel so that we can conveniently send all events to it. - let (events_tx, mut events_rx) = unbounded_channel(); + // Create a channel so that we can conveniently send all messages to it. + let (incoming_messages_tx, mut incoming_messages_rx) = unbounded_channel(); - // Forward all of the incoming messages from the widget to the `events_tx`. - let tx = events_tx.clone(); - tokio::spawn(async move { - while let Ok(msg) = self.from_widget_rx.recv().await { - let _ = tx.send(IncomingMessage::WidgetMessage(msg)); + // Forward all of the incoming messages from the widget. + tokio::spawn({ + let incoming_messages_tx = incoming_messages_tx.clone(); + async move { + while let Ok(msg) = self.from_widget_rx.recv().await { + let _ = incoming_messages_tx.send(IncomingMessage::WidgetMessage(msg)); + } } }); @@ -152,7 +151,7 @@ impl WidgetDriver { matrix_driver: MatrixDriver::new(room.clone()), event_forwarding_guard: None, to_widget_tx: self.to_widget_tx, - events_tx, + events_tx: incoming_messages_tx, capabilities_provider, }; @@ -161,9 +160,9 @@ impl WidgetDriver { ctx.process_action(action).await?; } - // Process incoming events. - while let Some(event) = events_rx.recv().await { - ctx.process_event(event).await?; + // Process incoming messages as they're coming in. + while let Some(message) = incoming_messages_rx.recv().await { + ctx.process_incoming_message(message).await?; } Ok(()) @@ -181,8 +180,10 @@ struct ProcessingContext { } impl ProcessingContext { - async fn process_event(&mut self, event: IncomingMessage) -> Result<(), ()> { - for action in self.widget_machine.process(event) { + /// Compute the actions for a single given incoming message, and performs + /// them immediately. + async fn process_incoming_message(&mut self, msg: IncomingMessage) -> Result<(), ()> { + for action in self.widget_machine.process(msg) { self.process_action(action).await?; } @@ -194,6 +195,7 @@ impl ProcessingContext { Action::SendToWidget(msg) => { self.to_widget_tx.send(msg).await.map_err(|_| ())?; } + Action::MatrixDriverRequest { request_id, data } => { let response = match data { MatrixDriverRequestData::AcquireCapabilities(cmd) => { @@ -253,6 +255,7 @@ impl ProcessingContext { .send(IncomingMessage::MatrixDriverResponse { request_id, response }) .map_err(|_| ())?; } + Action::Subscribe => { // Only subscribe if we are not already subscribed. if self.event_forwarding_guard.is_none() { @@ -264,10 +267,12 @@ impl ProcessingContext { self.event_forwarding_guard = Some(guard); let (mut matrix, events_tx) = (self.matrix_driver.events(), self.events_tx.clone()); + tokio::spawn(async move { loop { tokio::select! { _ = stop_forwarding.cancelled() => { return } + Some(event) = matrix.recv() => { let _ = events_tx.send(IncomingMessage::MatrixEventReceived(event)); } @@ -276,6 +281,7 @@ impl ProcessingContext { }); } } + Action::Unsubscribe => { self.event_forwarding_guard = None; }