From 35a2ce97d84af8cea6b9e1850a8ff4ff5c66d712 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 7 May 2025 10:48:58 +0200 Subject: [PATCH] refactor(widget): Use streams to streamline the action processing logic --- crates/matrix-sdk/src/widget/mod.rs | 38 ++++++++++++++++------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/crates/matrix-sdk/src/widget/mod.rs b/crates/matrix-sdk/src/widget/mod.rs index a8a4f24e3..3c8c1e417 100644 --- a/crates/matrix-sdk/src/widget/mod.rs +++ b/crates/matrix-sdk/src/widget/mod.rs @@ -17,10 +17,12 @@ use std::{fmt, time::Duration}; use async_channel::{Receiver, Sender}; +use futures_util::StreamExt; use matrix_sdk_common::executor::spawn; use ruma::api::client::delayed_events::DelayParameters; use serde::de::{self, Deserialize, Deserializer, Visitor}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::{CancellationToken, DropGuard}; use self::{ @@ -137,12 +139,18 @@ impl WidgetDriver { // - all incoming messages from the widget // - all responses from the Matrix driver // - all events from the Matrix driver, if subscribed - let (incoming_msg_tx, mut incoming_msg_rx) = unbounded_channel(); + let (incoming_msg_tx, incoming_msg_rx) = unbounded_channel(); // Forward all of the incoming messages from the widget. + // TODO: This spawns a detached task, it would be nice to have an owner for this + // task. One way to achieve this if `WidgetDriver::run()` returns a handle that + // we can drop which will clean up the task and the channels. It's not too bad, + // since canelling `run()` will drop the sender this task listens which finishes + // the task. spawn({ let incoming_msg_tx = incoming_msg_tx.clone(); let from_widget_rx = self.from_widget_rx.clone(); + async move { while let Ok(msg) = from_widget_rx.recv().await { let _ = incoming_msg_tx.send(IncomingMessage::WidgetMessage(msg)); @@ -150,7 +158,9 @@ impl WidgetDriver { } }); - // Create widget API machine. + // Create the widget API machine. The widget machine will process messages it + // receives from the widget and convert it into actions the `MatrixDriver` will + // then execute on. let (mut widget_machine, initial_actions) = WidgetMachine::new( self.settings.widget_id().to_owned(), room.room_id().to_owned(), @@ -159,25 +169,19 @@ impl WidgetDriver { let matrix_driver = MatrixDriver::new(room.clone()); - // Process initial actions that "initialise" the widget api machine. - for action in initial_actions { + // Convert the incoming message receiver into a stream of actions. + let stream = UnboundedReceiverStream::new(incoming_msg_rx) + .flat_map(|message| tokio_stream::iter(widget_machine.process(message))); + + // Let's combine our set of initial actions with the stream of received actions. + let mut combined = tokio_stream::iter(initial_actions).chain(stream); + + // Let's now process all actions we receive forever. + while let Some(action) = combined.next().await { self.process_action(&matrix_driver, &incoming_msg_tx, &capabilities_provider, action) .await?; } - // Process incoming messages as they're coming in. - while let Some(msg) = incoming_msg_rx.recv().await { - for action in widget_machine.process(msg) { - self.process_action( - &matrix_driver, - &incoming_msg_tx, - &capabilities_provider, - action, - ) - .await?; - } - } - Ok(()) }