refactor(widget): tidy up and start commenting the widget code

This commit is contained in:
Benjamin Bouvier
2024-11-14 10:54:07 +01:00
parent 02c7c2cdfc
commit 8070e3c165
4 changed files with 107 additions and 75 deletions

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)] // temporary
use ruma::events::{MessageLikeEventType, StateEventType, TimelineEventType};
use serde::Deserialize;

View File

@@ -124,7 +124,6 @@ pub(super) enum ReadEventRequest {
event_type: StateEventType,
state_key: StateKeySelector,
},
#[allow(dead_code)]
ReadMessageLikeEvent {
#[serde(rename = "type")]
event_type: MessageLikeEventType,

View File

@@ -14,8 +14,6 @@
//! No I/O logic of the [`WidgetDriver`].
#![warn(unreachable_pub)]
use std::{fmt, iter, time::Duration};
use driver_req::UpdateDelayedEventRequest;
@@ -54,7 +52,6 @@ use super::{
filter::{MatrixEventContent, MatrixEventFilterInput},
Capabilities, StateKeySelector,
};
use crate::widget::EventFilter;
mod driver_req;
mod from_widget;
@@ -72,7 +69,7 @@ pub(crate) use self::{
};
/// Action (a command) that client (driver) must perform.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub(crate) enum Action {
/// Send a raw message to the widget.
SendToWidget(String),
@@ -94,23 +91,33 @@ pub(crate) enum Action {
/// Subscribe to the events in the *current* room, i.e. a room which this
/// widget is instantiated with. The client is aware of the room.
#[allow(dead_code)]
Subscribe,
/// Unsuscribe from the events in the *current* room. Symmetrical to
/// `Subscribe`.
#[allow(dead_code)]
Unsubscribe,
}
/// No I/O state machine.
///
/// Handles interactions with the widget as well as the `MatrixDriver`.
/// Handles interactions with the widget as well as the
/// [`crate::widget::MatrixDriver`].
pub(crate) struct WidgetMachine {
/// Unique identifier for the widget.
///
/// Allows distinguishing different widgets.
widget_id: String,
/// The room to which this widget machine is attached.
room_id: OwnedRoomId,
/// Outstanding requests sent to the widget (mapped by uuid).
pending_to_widget_requests: PendingRequests<ToWidgetRequestMeta>,
/// Outstanding requests sent to the matrix driver (mapped by uuid).
pending_matrix_driver_requests: PendingRequests<MatrixDriverRequestMeta>,
/// Current negotiation state for capabilities.
capabilities: CapabilitiesState,
}
@@ -149,9 +156,11 @@ impl WidgetMachine {
match event {
IncomingMessage::WidgetMessage(raw) => self.process_widget_message(&raw),
IncomingMessage::MatrixDriverResponse { request_id, response } => {
self.process_matrix_driver_response(request_id, response)
}
IncomingMessage::MatrixEventReceived(event) => {
let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
error!("Received matrix event before capabilities negotiation");
@@ -172,10 +181,8 @@ impl WidgetMachine {
fn process_widget_message(&mut self, raw: &str) -> Vec<Action> {
let message = match serde_json::from_str::<IncomingWidgetMessage>(raw) {
Ok(msg) => msg,
Err(e) => {
// TODO: There is a special error handling required for the invalid
// messages. Refer to the `widget-api-poc` for implementation notes.
error!("Failed to parse incoming message: {e}");
Err(error) => {
error!("couldn't deserialize incoming widget message: {error}");
return Vec::new();
}
};
@@ -203,21 +210,22 @@ impl WidgetMachine {
) -> Vec<Action> {
let request = match raw_request.deserialize() {
Ok(r) => r,
Err(e) => return vec![self.send_from_widget_error_response(raw_request, e)],
Err(e) => return vec![Self::send_from_widget_error_response(raw_request, e)],
};
match request {
FromWidgetRequest::SupportedApiVersions {} => {
let response = SupportedApiVersionsResponse::new();
vec![self.send_from_widget_response(raw_request, response)]
vec![Self::send_from_widget_response(raw_request, response)]
}
FromWidgetRequest::ContentLoaded {} => {
let response = vec![self.send_from_widget_response(raw_request, JsonObject::new())];
self.capabilities
.is_unset()
.then(|| [&response, self.negotiate_capabilities().as_slice()].concat())
.unwrap_or(response)
let mut response =
vec![Self::send_from_widget_response(raw_request, JsonObject::new())];
if self.capabilities.is_unset() {
response.append(&mut self.negotiate_capabilities());
}
response
}
FromWidgetRequest::ReadEvent(req) => self
@@ -245,17 +253,20 @@ impl WidgetMachine {
action.map(|a| vec![a]).unwrap_or_default()
});
let response = self.send_from_widget_response(raw_request, OpenIdResponse::Pending);
let response =
Self::send_from_widget_response(raw_request, OpenIdResponse::Pending);
iter::once(response).chain(request_action).collect()
}
FromWidgetRequest::DelayedEventUpdate(req) => {
let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
let text =
"Received send update delayed event request before capabilities were negotiated";
return vec![self.send_from_widget_error_response(raw_request, text)];
return vec![Self::send_from_widget_error_response(raw_request, text)];
};
if !capabilities.update_delayed_event {
return vec![self.send_from_widget_error_response(
return vec![Self::send_from_widget_error_response(
raw_request,
format!(
"Not allowed: missing the {} capability.",
@@ -263,19 +274,22 @@ impl WidgetMachine {
),
)];
}
let (request, request_action) =
self.send_matrix_driver_request(UpdateDelayedEventRequest {
action: req.action,
delay_id: req.delay_id,
});
request.then(|res, machine| {
vec![machine.send_from_widget_result_response(
request.then(|res, _machine| {
vec![Self::send_from_widget_result_response(
raw_request,
// This is mapped to another type because the update_delay_event::Response
// does not impl Serialize
res.map(Into::<UpdateDelayedEventResponse>::into),
)]
});
request_action.map(|a| vec![a]).unwrap_or_default()
}
}
@@ -288,14 +302,14 @@ impl WidgetMachine {
) -> Option<Action> {
let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
let text = "Received read event request before capabilities were negotiated";
return Some(self.send_from_widget_error_response(raw_request, text));
return Some(Self::send_from_widget_error_response(raw_request, text));
};
match request {
ReadEventRequest::ReadMessageLikeEvent { event_type, limit } => {
let filter_fn = |f: &EventFilter| f.matches_message_like_event_type(&event_type);
if !capabilities.read.iter().any(filter_fn) {
return Some(self.send_from_widget_error_response(
if !capabilities.read.iter().any(|f| f.matches_message_like_event_type(&event_type))
{
return Some(Self::send_from_widget_error_response(
raw_request,
"Not allowed to read message like event",
));
@@ -304,7 +318,9 @@ impl WidgetMachine {
const DEFAULT_EVENT_LIMIT: u32 = 50;
let limit = limit.unwrap_or(DEFAULT_EVENT_LIMIT);
let request = ReadMessageLikeEventRequest { event_type, limit };
let (request, action) = self.send_matrix_driver_request(request);
request.then(|result, machine| {
let response = result.and_then(|mut events| {
let CapabilitiesState::Negotiated(capabilities) = &machine.capabilities
@@ -316,10 +332,13 @@ impl WidgetMachine {
events.retain(|e| capabilities.raw_event_matches_read_filter(e));
Ok(ReadEventResponse { events })
});
vec![machine.send_from_widget_result_response(raw_request, response)]
vec![Self::send_from_widget_result_response(raw_request, response)]
});
action
}
ReadEventRequest::ReadStateEvent { event_type, state_key } => {
let allowed = match &state_key {
StateKeySelector::Any => capabilities
@@ -342,13 +361,13 @@ impl WidgetMachine {
if allowed {
let request = ReadStateEventRequest { event_type, state_key };
let (request, action) = self.send_matrix_driver_request(request);
request.then(|result, machine| {
request.then(|result, _machine| {
let response = result.map(|events| ReadEventResponse { events });
vec![machine.send_from_widget_result_response(raw_request, response)]
vec![Self::send_from_widget_result_response(raw_request, response)]
});
action
} else {
Some(self.send_from_widget_error_response(
Some(Self::send_from_widget_error_response(
raw_request,
"Not allowed to read state event",
))
@@ -377,8 +396,9 @@ impl WidgetMachine {
Default::default()
}),
};
if !capabilities.send_delayed_event && request.delay.is_some() {
return Some(self.send_from_widget_error_response(
return Some(Self::send_from_widget_error_response(
raw_request,
format!(
"Not allowed: missing the {} capability.",
@@ -386,19 +406,23 @@ impl WidgetMachine {
),
));
}
if !capabilities.send.iter().any(|filter| filter.matches(&filter_in)) {
return Some(
self.send_from_widget_error_response(raw_request, "Not allowed to send event"),
);
return Some(Self::send_from_widget_error_response(
raw_request,
"Not allowed to send event",
));
}
let (request, action) = self.send_matrix_driver_request(request);
request.then(|mut result, machine| {
if let Ok(r) = result.as_mut() {
r.set_room_id(machine.room_id.clone());
}
vec![machine.send_from_widget_result_response(raw_request, result)]
vec![Self::send_from_widget_result_response(raw_request, result)]
});
action
}
@@ -453,38 +477,39 @@ impl WidgetMachine {
}
}
#[instrument(skip_all, fields(request_id))]
#[instrument(skip_all)]
fn send_from_widget_response(
&self,
raw_request: Raw<FromWidgetRequest>,
response_data: impl Serialize,
) -> Action {
let mut object = raw_request
.deserialize_as::<IndexMap<String, Box<RawJsonValue>>>()
.expect("Failed to converted FromWidgetRequest to object representation");
let response_data = serde_json::value::to_raw_value(&response_data)
.expect("Failed to serialize response data");
object.insert("response".to_owned(), response_data);
let serialized = serde_json::to_string(&object).expect("Failed to serialize response");
let f = || {
let mut object = raw_request.deserialize_as::<IndexMap<String, Box<RawJsonValue>>>()?;
let response_data = serde_json::value::to_raw_value(&response_data)?;
object.insert("response".to_owned(), response_data);
serde_json::to_string(&object)
};
// SAFETY: we expect the raw request to be a valid JSON map, to which we add a
// new field.
let serialized = f().expect("error when attaching response to incoming request");
Action::SendToWidget(serialized)
}
fn send_from_widget_error_response(
&self,
raw_request: Raw<FromWidgetRequest>,
error: impl fmt::Display,
) -> Action {
self.send_from_widget_response(raw_request, FromWidgetErrorResponse::new(error))
Self::send_from_widget_response(raw_request, FromWidgetErrorResponse::new(error))
}
fn send_from_widget_result_response(
&self,
raw_request: Raw<FromWidgetRequest>,
result: Result<impl Serialize, impl fmt::Display>,
) -> Action {
match result {
Ok(res) => self.send_from_widget_response(raw_request, res),
Err(msg) => self.send_from_widget_error_response(raw_request, msg),
Ok(res) => Self::send_from_widget_response(raw_request, res),
Err(msg) => Self::send_from_widget_error_response(raw_request, msg),
}
}
@@ -495,7 +520,7 @@ impl WidgetMachine {
) -> (ToWidgetRequestHandle<'_, T::ResponseData>, Option<Action>) {
#[derive(Serialize)]
#[serde(tag = "api", rename = "toWidget", rename_all = "camelCase")]
struct ToWidgetRequestSerHelper<'a, T> {
struct ToWidgetRequestSerdeHelper<'a, T> {
widget_id: &'a str,
request_id: Uuid,
action: &'static str,
@@ -503,7 +528,7 @@ impl WidgetMachine {
}
let request_id = Uuid::new_v4();
let full_request = ToWidgetRequestSerHelper {
let full_request = ToWidgetRequestSerdeHelper {
widget_id: &self.widget_id,
request_id,
action: T::ACTION,
@@ -561,7 +586,7 @@ impl WidgetMachine {
let update = NotifyCapabilitiesChanged { approved, requested };
let (_request, action) = machine.send_to_widget_request(update);
(subscribe_required).then(|| Action::Subscribe).into_iter().chain(action).collect()
subscribe_required.then(|| Action::Subscribe).into_iter().chain(action).collect()
});
action.map(|a| vec![a]).unwrap_or_default()
@@ -598,9 +623,13 @@ impl MatrixDriverRequestMeta {
}
}
/// Current negotiation state for capabilities.
enum CapabilitiesState {
/// Capabilities have never been defined.
Unset,
/// We're currently negotiating capabilities.
Negotiating,
/// The capabilities have already been negotiated.
Negotiated(Capabilities),
}

View File

@@ -115,26 +115,25 @@ impl WidgetDriver {
(driver, channels)
}
/// Starts a client widget API state machine for a given `widget` in a given
/// joined `room`. The function returns once the widget is disconnected or
/// any terminal error occurs.
/// Run client widget API state machine in a given joined `room` forever.
///
/// Not implemented yet! Currently, it does not contain any useful
/// functionality, it only blindly forwards the messages and returns errors
/// once a non-implemented part is triggered.
/// The function returns once the widget is disconnected or any terminal
/// error occurs.
pub async fn run(
self,
room: Room,
capabilities_provider: impl CapabilitiesProvider,
) -> Result<(), ()> {
// Create a channel so that we can conveniently send all events to it.
let (events_tx, mut events_rx) = unbounded_channel();
// Create a channel so that we can conveniently send all messages to it.
let (incoming_messages_tx, mut incoming_messages_rx) = unbounded_channel();
// Forward all of the incoming messages from the widget to the `events_tx`.
let tx = events_tx.clone();
tokio::spawn(async move {
while let Ok(msg) = self.from_widget_rx.recv().await {
let _ = tx.send(IncomingMessage::WidgetMessage(msg));
// Forward all of the incoming messages from the widget.
tokio::spawn({
let incoming_messages_tx = incoming_messages_tx.clone();
async move {
while let Ok(msg) = self.from_widget_rx.recv().await {
let _ = incoming_messages_tx.send(IncomingMessage::WidgetMessage(msg));
}
}
});
@@ -152,7 +151,7 @@ impl WidgetDriver {
matrix_driver: MatrixDriver::new(room.clone()),
event_forwarding_guard: None,
to_widget_tx: self.to_widget_tx,
events_tx,
events_tx: incoming_messages_tx,
capabilities_provider,
};
@@ -161,9 +160,9 @@ impl WidgetDriver {
ctx.process_action(action).await?;
}
// Process incoming events.
while let Some(event) = events_rx.recv().await {
ctx.process_event(event).await?;
// Process incoming messages as they're coming in.
while let Some(message) = incoming_messages_rx.recv().await {
ctx.process_incoming_message(message).await?;
}
Ok(())
@@ -181,8 +180,10 @@ struct ProcessingContext<T> {
}
impl<T: CapabilitiesProvider> ProcessingContext<T> {
async fn process_event(&mut self, event: IncomingMessage) -> Result<(), ()> {
for action in self.widget_machine.process(event) {
/// Compute the actions for a single given incoming message, and performs
/// them immediately.
async fn process_incoming_message(&mut self, msg: IncomingMessage) -> Result<(), ()> {
for action in self.widget_machine.process(msg) {
self.process_action(action).await?;
}
@@ -194,6 +195,7 @@ impl<T: CapabilitiesProvider> ProcessingContext<T> {
Action::SendToWidget(msg) => {
self.to_widget_tx.send(msg).await.map_err(|_| ())?;
}
Action::MatrixDriverRequest { request_id, data } => {
let response = match data {
MatrixDriverRequestData::AcquireCapabilities(cmd) => {
@@ -253,6 +255,7 @@ impl<T: CapabilitiesProvider> ProcessingContext<T> {
.send(IncomingMessage::MatrixDriverResponse { request_id, response })
.map_err(|_| ())?;
}
Action::Subscribe => {
// Only subscribe if we are not already subscribed.
if self.event_forwarding_guard.is_none() {
@@ -264,10 +267,12 @@ impl<T: CapabilitiesProvider> ProcessingContext<T> {
self.event_forwarding_guard = Some(guard);
let (mut matrix, events_tx) =
(self.matrix_driver.events(), self.events_tx.clone());
tokio::spawn(async move {
loop {
tokio::select! {
_ = stop_forwarding.cancelled() => { return }
Some(event) = matrix.recv() => {
let _ = events_tx.send(IncomingMessage::MatrixEventReceived(event));
}
@@ -276,6 +281,7 @@ impl<T: CapabilitiesProvider> ProcessingContext<T> {
});
}
}
Action::Unsubscribe => {
self.event_forwarding_guard = None;
}