From 1fa488ec049a2763e42f62989d8855c8fbfe16ad Mon Sep 17 00:00:00 2001 From: ganfra Date: Fri, 26 Jan 2024 15:36:50 +0100 Subject: [PATCH] typing : add subscribe_to_typing_notifications and expose through ffi --- bindings/matrix-sdk-ffi/src/room.rs | 17 ++++ crates/matrix-sdk/src/room/mod.rs | 29 ++++++- .../tests/integration/room/joined.rs | 81 ++++++++++++++++++- 3 files changed, 122 insertions(+), 5 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/room.rs b/bindings/matrix-sdk-ffi/src/room.rs index 54288286c..f18d057a3 100644 --- a/bindings/matrix-sdk-ffi/src/room.rs +++ b/bindings/matrix-sdk-ffi/src/room.rs @@ -505,6 +505,18 @@ impl Room { pub async fn typing_notice(&self, is_typing: bool) -> Result<(), ClientError> { Ok(self.inner.typing_notice(is_typing).await?) } + pub async fn subscribe_to_typing_notifications( + self: Arc, + listener: Box, + ) -> Arc { + Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + let (_guard, mut subscriber) = self.inner.subscribe_to_typing_notifications(); + while let Ok(typing_users) = subscriber.recv().await { + let typing_users = typing_users.iter().map(|u| u.to_string()).collect(); + listener.call(typing_users); + } + }))) + } } #[uniffi::export(callback_interface)] @@ -512,6 +524,11 @@ pub trait RoomInfoListener: Sync + Send { fn call(&self, room_info: RoomInfo); } +#[uniffi::export(callback_interface)] +pub trait TypingNotificationsListener: Sync + Send { + fn call(&self, typing_users: Vec); +} + #[derive(uniffi::Object)] pub struct RoomMembersIterator { chunk_iterator: ChunkIterator, diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 509045cc0..d2692a2c1 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -1,6 +1,12 @@ //! High-level room API -use std::{borrow::Borrow, collections::BTreeMap, ops::Deref, time::Duration}; +use std::{ + borrow::Borrow, + collections::BTreeMap, + ops::Deref, + sync::{mpsc::channel, Arc}, + time::Duration, +}; use eyeball::SharedObservable; use futures_core::Stream; @@ -57,6 +63,7 @@ use ruma::{ }, space::{child::SpaceChildEventContent, parent::SpaceParentEventContent}, tag::{TagInfo, TagName}, + typing::SyncTypingEvent, AnyRoomAccountDataEvent, AnyStateEvent, EmptyStateKey, MessageLikeEventContent, MessageLikeEventType, RedactContent, RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent, RoomAccountDataEventType, StateEventContent, StateEventType, @@ -76,7 +83,7 @@ use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEven use crate::{ attachment::AttachmentConfig, error::WrongRoomState, - event_handler::{EventHandler, EventHandlerHandle, SyncEvent}, + event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent}, media::{MediaFormat, MediaRequest}, notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode}, sync::RoomUpdate, @@ -315,6 +322,24 @@ impl Room { self.client.subscribe_to_room_updates(self.room_id()) } + /// Subscribe to typing notifications for this room. + /// + /// The returned receiver will receive a new vector of user IDs for each + /// sync response that contains 'm.typing' event. + pub fn subscribe_to_typing_notifications( + &self, + ) -> (EventHandlerDropGuard, broadcast::Receiver>) { + let (sender, receiver) = broadcast::channel(16); + let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), { + let sender = sender.clone(); + move |ev: SyncTypingEvent| async move { + let _ = sender.send(ev.content.user_ids); + } + }); + let guard = self.client().event_handler_drop_guard(typing_event_handler_handle); + (guard, receiver) + } + /// Fetch the event with the given `EventId` in this room. pub async fn event(&self, event_id: &EventId) -> Result { let request = diff --git a/crates/matrix-sdk/tests/integration/room/joined.rs b/crates/matrix-sdk/tests/integration/room/joined.rs index 84ab142c4..38c565f6c 100644 --- a/crates/matrix-sdk/tests/integration/room/joined.rs +++ b/crates/matrix-sdk/tests/integration/room/joined.rs @@ -1,4 +1,7 @@ -use std::time::Duration; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; use futures_util::future::join_all; use matrix_sdk::{ @@ -10,12 +13,15 @@ use matrix_sdk::{ room::{Receipts, ReportedContentScore}, }; use matrix_sdk_base::RoomState; -use matrix_sdk_test::{async_test, test_json, DEFAULT_TEST_ROOM_ID}; +use matrix_sdk_test::{ + async_test, test_json, EphemeralTestEvent, JoinedRoomBuilder, SyncResponseBuilder, + DEFAULT_TEST_ROOM_ID, +}; use ruma::{ api::client::{membership::Invite3pidInit, receipt::create_receipt::v3::ReceiptType}, assign, event_id, events::{receipt::ReceiptThread, room::message::RoomMessageEventContent}, - int, mxc_uri, owned_event_id, thirdparty, uint, user_id, TransactionId, + int, mxc_uri, owned_event_id, room_id, thirdparty, uint, user_id, OwnedUserId, TransactionId, }; use serde_json::json; use wiremock::{ @@ -642,3 +648,72 @@ async fn report_content() { room.report_content(event_id, Some(score), Some(reason.to_owned())).await.unwrap(); } + +#[async_test] +async fn subscribe_to_typing_notifications() { + let (client, server) = logged_in_client().await; + let mut typing_sequences: Arc>>> = Arc::new(Mutex::new(Vec::new())); + let asserted_typing_sequences = + vec![vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")], vec![]]; + let room_id = room_id!("!test:example.org"); + let mut ev_builder = SyncResponseBuilder::new(); + + // Initial sync with our test room. + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + // Send to typing notification + let room = client.get_room(room_id).unwrap(); + let typing_seq = Arc::clone(&typing_sequences); + let handle = tokio::spawn(async move { + let (_guard, mut subscriber) = room.subscribe_to_typing_notifications(); + while let Ok(typing_users) = subscriber.recv().await { + let mut typings = typing_seq.lock().unwrap(); + typings.push(typing_users); + } + }); + // Then send a typing notification with 2 users typing + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( + EphemeralTestEvent::Custom(json!({ + "content": { + "user_ids": [ + "@alice:matrix.org", + "@bob:example.com" + ] + }, + "room_id": "!jEsUZKDJdhlrceRyVU:example.org", + "type": "m.typing" + })), + )); + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + // Then send a typing notification with no users typings + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( + EphemeralTestEvent::Custom(json!({ + "content": { + "user_ids": [] + }, + "room_id": "!jEsUZKDJdhlrceRyVU:example.org", + "type": "m.typing" + })), + )); + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + // Cancel the task, makes sure the subscription is cancelled too. + handle.abort(); + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id).add_ephemeral_event(EphemeralTestEvent::Typing), + ); + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + assert_eq!(typing_sequences.lock().unwrap().to_vec(), asserted_typing_sequences); +}