feat(sdk): Implement Client::observe_events and Client::observe_room_events.

Changelog: This patch introduces a mechanism similar to
 `Client::add_event_handler` and `Client::add_room_event_handler`
 but with a reactive programming pattern. This patch adds
 `Client::observe_events` and `Client::observe_room_events`.

 ```rust
 // Get an observer.
 let observer =
     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();

 // Subscribe to the observer.
 let mut subscriber = observer.subscribe();

 // Use the subscriber as a `Stream`.
 let (message_event, (room, push_actions)) = subscriber.next().await.unwrap();
 ```

 When calling `observe_events`, one has to specify the type of event
 (in the example, `SyncRoomMessageEvent`) and a context (in the example,
 `(Room, Vec<Action>)`, respectively for the room and the push actions).
This commit is contained in:
Ivan Enderlin
2024-11-12 18:01:49 +01:00
parent e798a51709
commit 6cef7f20c5
4 changed files with 373 additions and 6 deletions

1
Cargo.lock generated
View File

@@ -2914,6 +2914,7 @@ dependencies = [
"mime2ext",
"once_cell",
"openidconnect",
"pin-project-lite",
"proptest",
"rand",
"reqwest",

View File

@@ -97,6 +97,7 @@ matrix-sdk-sqlite = { workspace = true, optional = true }
matrix-sdk-test = { workspace = true, optional = true }
mime = "0.3.16"
mime2ext = "0.1.52"
pin-project-lite = { workspace = true }
rand = { workspace = true , optional = true }
ruma = { workspace = true, features = ["rand", "unstable-msc2448", "unstable-msc2965", "unstable-msc3930", "unstable-msc3245-v1-compat", "unstable-msc2867"] }
serde = { workspace = true }

View File

@@ -17,7 +17,7 @@
use std::{
collections::{btree_map, BTreeMap},
fmt::{self, Debug},
future::Future,
future::{ready, Future},
pin::Pin,
sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
};
@@ -88,7 +88,8 @@ use crate::{
error::{HttpError, HttpResult},
event_cache::EventCache,
event_handler::{
EventHandler, EventHandlerDropGuard, EventHandlerHandle, EventHandlerStore, SyncEvent,
EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
EventHandlerStore, ObservableEventHandler, SyncEvent,
},
http_client::HttpClient,
matrix_auth::MatrixAuth,
@@ -776,7 +777,7 @@ impl Client {
/// ```
pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
where
Ev: SyncEvent + DeserializeOwned + Send + 'static,
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
H: EventHandler<Ev, Ctx>,
{
self.add_event_handler_impl(handler, None)
@@ -798,12 +799,96 @@ impl Client {
handler: H,
) -> EventHandlerHandle
where
Ev: SyncEvent + DeserializeOwned + Send + 'static,
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
H: EventHandler<Ev, Ctx>,
{
self.add_event_handler_impl(handler, Some(room_id.to_owned()))
}
/// Observe a specific event type.
///
/// `Ev` represents the kind of event that will be observed. `Ctx`
/// represents the context that will come with the event. It relies on the
/// same mechanism as [`Self::add_event_handler`]. The main difference is
/// that it returns an [`ObservableEventHandler`] and doesn't require a
/// user-defined closure. It is possible to subscribe to the
/// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
/// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
/// Ctx)`.
///
/// # Example
///
/// ```
/// use futures_util::StreamExt as _;
/// use matrix_sdk::{
/// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
/// Client, Room,
/// };
///
/// # async fn example(client: Client) {
/// let observer =
/// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
///
/// let mut subscriber = observer.subscribe();
///
/// let (message_event, (room, push_actions)) =
/// subscriber.next().await.unwrap();
/// # }
/// ```
///
/// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
where
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
{
self.observe_room_events_impl(None)
}
/// Observe a specific room, and event type.
///
/// This method works the same way as
/// [`observe_events`][Self::observe_events], except that the observability
/// will only be applied for events in the room with the specified ID.
/// See that method for more details.
pub fn observe_room_events<Ev, Ctx>(
&self,
room_id: &RoomId,
) -> ObservableEventHandler<(Ev, Ctx)>
where
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
{
self.observe_room_events_impl(Some(room_id.to_owned()))
}
/// Shared implementation for `Self::observe_events` and
/// `Self::observe_room_events`.
fn observe_room_events_impl<Ev, Ctx>(
&self,
room_id: Option<OwnedRoomId>,
) -> ObservableEventHandler<(Ev, Ctx)>
where
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
{
// The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
// new value.
let shared_observable = SharedObservable::new(None);
ObservableEventHandler::new(
shared_observable.clone(),
self.event_handler_drop_guard(self.add_event_handler_impl(
move |event: Ev, context: Ctx| {
shared_observable.set(Some((event, context)));
ready(())
},
room_id,
)),
)
}
/// Remove the event handler associated with the handle.
///
/// Note that you **must not** call `remove_event_handler` from the

View File

@@ -40,16 +40,20 @@ use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering::SeqCst},
RwLock,
Arc, RwLock, Weak,
},
task::{Context, Poll},
};
use anymap2::any::CloneAnySendSync;
use eyeball::{SharedObservable, Subscriber};
use futures_core::Stream;
use futures_util::stream::{FuturesUnordered, StreamExt};
use matrix_sdk_base::{
deserialized_responses::{EncryptionInfo, SyncTimelineEvent},
SendOutsideWasm, SyncOutsideWasm,
};
use pin_project_lite::pin_project;
use ruma::{events::AnySyncStateEvent, push::Action, serde::Raw, OwnedRoomId};
use serde::{de::DeserializeOwned, Deserialize};
use serde_json::value::RawValue as RawJsonValue;
@@ -287,7 +291,7 @@ impl Client {
room_id: Option<OwnedRoomId>,
) -> EventHandlerHandle
where
Ev: SyncEvent + DeserializeOwned + Send + 'static,
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
H: EventHandler<Ev, Ctx>,
{
let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
@@ -535,11 +539,139 @@ impl_event_handler!(A, B, C, D, E, F);
impl_event_handler!(A, B, C, D, E, F, G);
impl_event_handler!(A, B, C, D, E, F, G, H);
/// An observer of events (may be tailored to a room).
///
/// To create such observer, use [`Client::observe_events`] or
/// [`Client::observe_room_events`].
#[derive(Debug)]
pub struct ObservableEventHandler<T> {
/// This type is actually nothing more than a thin glue layer between the
/// [`EventHandler`] mechanism and the reactive programming types from
/// [`eyeball`]. Here, we use a [`SharedObservable`] that is updated by the
/// [`EventHandler`].
shared_observable: SharedObservable<Option<T>>,
/// This type owns the [`EventHandlerDropGuard`]. As soon as this type goes
/// out of scope, the event handler is unregistered/removed.
///
/// [`EventHandlerSubscriber`] holds a weak, non-owning reference, to this
/// guard. It is useful to detect when to close the [`Stream`]: as soon as
/// this type goes out of scope, the subscriber will close itself on poll.
event_handler_guard: Arc<EventHandlerDropGuard>,
}
impl<T> ObservableEventHandler<T> {
pub(crate) fn new(
shared_observable: SharedObservable<Option<T>>,
event_handler_guard: EventHandlerDropGuard,
) -> Self {
Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
}
/// Subscribe to this observer.
///
/// It returns an [`EventHandlerSubscriber`], which implements [`Stream`].
/// See its documentation to learn more.
pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
EventHandlerSubscriber::new(
self.shared_observable.subscribe(),
// The subscriber holds a weak non-owning reference to the event handler guard, so that
// it can detect when this observer is dropped, and can close the subscriber's stream.
Arc::downgrade(&self.event_handler_guard),
)
}
}
pin_project! {
/// The subscriber of an [`ObservableEventHandler`].
///
/// To create such subscriber, use [`ObservableEventHandler::subscribe`].
///
/// This type implements [`Stream`], which means it is possible to poll the
/// next value asynchronously. In other terms, polling this type will return
/// the new event as soon as they are synced. See [`Client::observe_events`]
/// to learn more.
#[derive(Debug)]
pub struct EventHandlerSubscriber<T> {
// The `Subscriber` associated to the `SharedObservable` inside
// `ObservableEventHandle`.
//
// Keep in mind all this API is just a thin glue layer between
// `EventHandle` and `SharedObservable`, that's… maagiic!
#[pin]
subscriber: Subscriber<Option<T>>,
// A weak non-owning reference to the event handler guard from
// `ObservableEventHandler`. When this type is polled (via its `Stream`
// implementation), it is possible to detect whether the observable has
// been dropped by upgrading this weak reference, and close the `Stream`
// if it needs to.
event_handler_guard: Weak<EventHandlerDropGuard>,
}
}
impl<T> EventHandlerSubscriber<T> {
fn new(
subscriber: Subscriber<Option<T>>,
event_handler_handle: Weak<EventHandlerDropGuard>,
) -> Self {
Self { subscriber, event_handler_guard: event_handler_handle }
}
}
impl<T> Stream for EventHandlerSubscriber<T>
where
T: Clone,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let Some(_) = this.event_handler_guard.upgrade() else {
// The `EventHandlerHandle` has been dropped via `EventHandlerDropGuard`. It
// means the `ObservableEventHandler` has been dropped. It's time to
// close this stream.
return Poll::Ready(None);
};
// First off, the subscriber is of type `Subscriber<Option<T>>` because the
// `SharedObservable` starts with a `None` value to indicate it has no yet
// received any update. We want the `Stream` to return `T`, not `Option<T>`. We
// then filter out all `None` value.
//
// Second, when a `None` value is met, we want to poll again (hence the `loop`).
// At best, there is a new value to return. At worst, the subscriber will return
// `Poll::Pending` and will register the wakers accordingly.
loop {
match this.subscriber.as_mut().poll_next(context) {
// Stream has been closed somehow.
Poll::Ready(None) => return Poll::Ready(None),
// The initial value (of the `SharedObservable` behind `self.subscriber`) has been
// polled. We want to filter it out.
Poll::Ready(Some(None)) => {
// Loop over.
continue;
}
// We have a new value!
Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
// Classical pending.
Poll::Pending => return Poll::Pending,
}
}
}
}
#[cfg(test)]
mod tests {
use matrix_sdk_test::{
async_test, InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID,
};
use stream_assert::{assert_closed, assert_pending, assert_ready};
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
use std::{
@@ -884,4 +1016,152 @@ mod tests {
assert_eq!(counter.load(SeqCst), 1);
Ok(())
}
#[async_test]
#[allow(dependency_on_unit_never_type_fallback)]
async fn test_observe_events() -> crate::Result<()> {
let client = logged_in_client(None).await;
let room_id_0 = room_id!("!r0.matrix.org");
let room_id_1 = room_id!("!r1.matrix.org");
let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
let mut subscriber = observable.subscribe();
assert_pending!(subscriber);
let mut response_builder = SyncResponseBuilder::new();
let response = response_builder
.add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
StateTestEvent::Custom(json!({
"content": {
"name": "Name 0"
},
"event_id": "$ev0",
"origin_server_ts": 1,
"sender": "@mnt_io:matrix.org",
"state_key": "",
"type": "m.room.name",
"unsigned": {
"age": 1,
}
})),
))
.build_sync_response();
client.process_sync(response).await?;
let (room_name, room) = assert_ready!(subscriber);
assert_eq!(room_name.event_id.as_str(), "$ev0");
assert_eq!(room.room_id(), room_id_0);
assert_eq!(room.name().unwrap(), "Name 0");
assert_pending!(subscriber);
let response = response_builder
.add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
StateTestEvent::Custom(json!({
"content": {
"name": "Name 1"
},
"event_id": "$ev1",
"origin_server_ts": 2,
"sender": "@mnt_io:matrix.org",
"state_key": "",
"type": "m.room.name",
"unsigned": {
"age": 2,
}
})),
))
.build_sync_response();
client.process_sync(response).await?;
let (room_name, room) = assert_ready!(subscriber);
assert_eq!(room_name.event_id.as_str(), "$ev1");
assert_eq!(room.room_id(), room_id_1);
assert_eq!(room.name().unwrap(), "Name 1");
assert_pending!(subscriber);
drop(observable);
assert_closed!(subscriber);
Ok(())
}
#[async_test]
#[allow(dependency_on_unit_never_type_fallback)]
async fn test_observe_room_events() -> crate::Result<()> {
let client = logged_in_client(None).await;
let room_id = room_id!("!r0.matrix.org");
let observable_for_room =
client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
let mut subscriber_for_room = observable_for_room.subscribe();
assert_pending!(subscriber_for_room);
let mut response_builder = SyncResponseBuilder::new();
let response = response_builder
.add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
StateTestEvent::Custom(json!({
"content": {
"name": "Name 0"
},
"event_id": "$ev0",
"origin_server_ts": 1,
"sender": "@mnt_io:matrix.org",
"state_key": "",
"type": "m.room.name",
"unsigned": {
"age": 1,
}
})),
))
.build_sync_response();
client.process_sync(response).await?;
let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
assert_eq!(room_name.event_id.as_str(), "$ev0");
assert_eq!(room.name().unwrap(), "Name 0");
assert_pending!(subscriber_for_room);
let response = response_builder
.add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
StateTestEvent::Custom(json!({
"content": {
"name": "Name 1"
},
"event_id": "$ev1",
"origin_server_ts": 2,
"sender": "@mnt_io:matrix.org",
"state_key": "",
"type": "m.room.name",
"unsigned": {
"age": 2,
}
})),
))
.build_sync_response();
client.process_sync(response).await?;
let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
assert_eq!(room_name.event_id.as_str(), "$ev1");
assert_eq!(room.name().unwrap(), "Name 1");
assert_pending!(subscriber_for_room);
drop(observable_for_room);
assert_closed!(subscriber_for_room);
Ok(())
}
}