Revert "feat(sdk): Remove NotificationSettings::subscribe_to_changes."

This reverts commit 4e291205d5.
This commit is contained in:
Kévin Commaille
2024-08-31 14:33:45 +02:00
committed by Ivan Enderlin
parent c4624cc863
commit 84e4552da7
2 changed files with 56 additions and 5 deletions

View File

@@ -25,7 +25,10 @@ use ruma::{
push::{Action, PredefinedUnderrideRuleId, RuleKind, Ruleset, Tweak},
RoomId,
};
use tokio::sync::RwLock;
use tokio::sync::{
broadcast::{self, Receiver},
RwLock,
};
use tracing::{debug, error};
use self::{command::Command, rule_commands::RuleCommands, rules::Rules};
@@ -88,29 +91,40 @@ pub struct NotificationSettings {
rules: Arc<RwLock<Rules>>,
/// Drop guard of event handler for push rules event.
_push_rules_event_handler_guard: Arc<EventHandlerDropGuard>,
changes_sender: broadcast::Sender<()>,
}
impl NotificationSettings {
/// Build a new `NotificationSettings`.
/// Build a new `NotificationSettings``
///
/// # Arguments
///
/// * `client` - A `Client` used to perform API calls
/// * `ruleset` - A `Ruleset` containing account's owner push rules
pub(crate) fn new(client: Client, ruleset: Ruleset) -> Self {
let changes_sender = broadcast::Sender::new(100);
let rules = Arc::new(RwLock::new(Rules::new(ruleset)));
// Listen for PushRulesEvent
let push_rules_event_handler_handle = client.add_event_handler({
let changes_sender = changes_sender.clone();
let rules = Arc::clone(&rules);
move |ev: PushRulesEvent| async move {
*rules.write().await = Rules::new(ev.content.global);
let _ = changes_sender.send(());
}
});
let _push_rules_event_handler_guard =
client.event_handler_drop_guard(push_rules_event_handler_handle).into();
Self { client, rules, _push_rules_event_handler_guard }
Self { client, rules, _push_rules_event_handler_guard, changes_sender }
}
/// Subscribe to changes in the `NotificationSettings`.
///
/// Changes can happen due to local changes or changes in another session.
pub fn subscribe_to_changes(&self) -> Receiver<()> {
self.changes_sender.subscribe()
}
/// Get the user defined notification mode for a room.
@@ -514,6 +528,7 @@ mod tests {
use matrix_sdk_test::{
async_test,
notification_settings::{build_ruleset, get_server_default_ruleset},
test_json,
};
use ruma::{
push::{
@@ -522,12 +537,16 @@ mod tests {
},
OwnedRoomId, RoomId,
};
use serde_json::json;
use stream_assert::{assert_next_eq, assert_pending};
use tokio_stream::wrappers::BroadcastStream;
use wiremock::{
matchers::{method, path, path_regex},
matchers::{header, method, path, path_regex},
Mock, MockServer, ResponseTemplate,
};
use crate::{
config::SyncSettings,
error::NotificationSettingsError,
notification_settings::{
IsEncrypted, IsOneToOne, NotificationSettings, RoomNotificationMode,
@@ -555,6 +574,36 @@ mod tests {
settings.rules.read().await.get_custom_rules_for_room(room_id)
}
#[async_test]
async fn subscribe_to_changes() {
let server = MockServer::start().await;
let client = logged_in_client(Some(server.uri())).await;
let settings = client.notification_settings().await;
Mock::given(method("GET"))
.and(path("/_matrix/client/r0/sync"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"next_batch": "1234",
"account_data": {
"events": [*test_json::PUSH_RULES]
}
})))
.expect(1)
.mount(&server)
.await;
let subscriber = settings.subscribe_to_changes();
let mut stream = BroadcastStream::new(subscriber);
assert_pending!(stream);
client.sync_once(SyncSettings::default()).await.unwrap();
assert_next_eq!(stream, Ok(()));
assert_pending!(stream);
}
#[async_test]
async fn test_get_custom_rules_for_room() {
let server = MockServer::start().await;

View File

@@ -581,6 +581,8 @@ async fn test_room_notification_count() -> Result<()> {
// Now Alice is only interesting in mentions of their name.
let settings = alice.notification_settings().await;
let mut settings_changes = settings.subscribe_to_changes();
tracing::warn!("Updating room notification mode to mentions and keywords only...");
settings
.set_room_notification_mode(
@@ -591,7 +593,7 @@ async fn test_room_notification_count() -> Result<()> {
tracing::warn!("Done!");
// Wait for remote echo.
tokio::time::sleep(Duration::from_secs(3))
timeout(Duration::from_secs(3), settings_changes.recv())
.await
.expect("timeout when waiting for settings update")
.expect("should've received echo after updating settings");