refactor(widget): Use streams to streamline the action processing logic

This commit is contained in:
Damir Jelić
2025-05-07 10:48:58 +02:00
parent f042084bd2
commit 35a2ce97d8

View File

@@ -17,10 +17,12 @@
use std::{fmt, time::Duration};
use async_channel::{Receiver, Sender};
use futures_util::StreamExt;
use matrix_sdk_common::executor::spawn;
use ruma::api::client::delayed_events::DelayParameters;
use serde::de::{self, Deserialize, Deserializer, Visitor};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::{CancellationToken, DropGuard};
use self::{
@@ -137,12 +139,18 @@ impl WidgetDriver {
// - all incoming messages from the widget
// - all responses from the Matrix driver
// - all events from the Matrix driver, if subscribed
let (incoming_msg_tx, mut incoming_msg_rx) = unbounded_channel();
let (incoming_msg_tx, incoming_msg_rx) = unbounded_channel();
// Forward all of the incoming messages from the widget.
// TODO: This spawns a detached task, it would be nice to have an owner for this
// task. One way to achieve this if `WidgetDriver::run()` returns a handle that
// we can drop which will clean up the task and the channels. It's not too bad,
// since canelling `run()` will drop the sender this task listens which finishes
// the task.
spawn({
let incoming_msg_tx = incoming_msg_tx.clone();
let from_widget_rx = self.from_widget_rx.clone();
async move {
while let Ok(msg) = from_widget_rx.recv().await {
let _ = incoming_msg_tx.send(IncomingMessage::WidgetMessage(msg));
@@ -150,7 +158,9 @@ impl WidgetDriver {
}
});
// Create widget API machine.
// Create the widget API machine. The widget machine will process messages it
// receives from the widget and convert it into actions the `MatrixDriver` will
// then execute on.
let (mut widget_machine, initial_actions) = WidgetMachine::new(
self.settings.widget_id().to_owned(),
room.room_id().to_owned(),
@@ -159,25 +169,19 @@ impl WidgetDriver {
let matrix_driver = MatrixDriver::new(room.clone());
// Process initial actions that "initialise" the widget api machine.
for action in initial_actions {
// Convert the incoming message receiver into a stream of actions.
let stream = UnboundedReceiverStream::new(incoming_msg_rx)
.flat_map(|message| tokio_stream::iter(widget_machine.process(message)));
// Let's combine our set of initial actions with the stream of received actions.
let mut combined = tokio_stream::iter(initial_actions).chain(stream);
// Let's now process all actions we receive forever.
while let Some(action) = combined.next().await {
self.process_action(&matrix_driver, &incoming_msg_tx, &capabilities_provider, action)
.await?;
}
// Process incoming messages as they're coming in.
while let Some(msg) = incoming_msg_rx.recv().await {
for action in widget_machine.process(msg) {
self.process_action(
&matrix_driver,
&incoming_msg_tx,
&capabilities_provider,
action,
)
.await?;
}
}
Ok(())
}