feat(room): create a cleanup task in Room::subscribe_to_knock_requests

This cleanup task will run while the knock request subscription runs and will use the `Room::room_member_updates_sender` notification to call `Room::remove_outdated_seen_knock_requests_ids` and remove outdated seen knock request ids automatically.
This commit is contained in:
Jorge Martín
2024-12-18 17:49:18 +01:00
committed by Jorge Martin Espinosa
parent 4a88e7cfee
commit 616c193a30
3 changed files with 143 additions and 16 deletions

View File

@@ -924,13 +924,15 @@ impl Room {
self: Arc<Self>,
listener: Box<dyn KnockRequestsListener>,
) -> Result<Arc<TaskHandle>, ClientError> {
let stream = self.inner.subscribe_to_knock_requests().await?;
let (stream, seen_ids_cleanup_handle) = self.inner.subscribe_to_knock_requests().await?;
let handle = Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
pin_mut!(stream);
while let Some(requests) = stream.next().await {
listener.call(requests.into_iter().map(Into::into).collect());
}
// Cancel the seen ids cleanup task
seen_ids_cleanup_handle.abort();
})));
Ok(handle)

View File

@@ -47,7 +47,11 @@ use matrix_sdk_base::{
ComposerDraft, RoomInfoNotableUpdateReasons, RoomMemberships, StateChanges, StateStoreDataKey,
StateStoreDataValue,
};
use matrix_sdk_common::{deserialized_responses::SyncTimelineEvent, timeout::timeout};
use matrix_sdk_common::{
deserialized_responses::SyncTimelineEvent,
executor::{spawn, JoinHandle},
timeout::timeout,
};
use mime::Mime;
#[cfg(feature = "e2e-encryption")]
use ruma::events::{
@@ -3224,9 +3228,12 @@ impl Room {
/// - A knock request is marked as seen.
/// - A sync is gappy (limited), so room membership information may be
/// outdated.
///
/// Returns both a stream of knock requests and a handle for a task that
/// will clean up the seen knock request ids when possible.
pub async fn subscribe_to_knock_requests(
&self,
) -> Result<impl Stream<Item = Vec<KnockRequest>>> {
) -> Result<(impl Stream<Item = Vec<KnockRequest>>, JoinHandle<()>)> {
let this = Arc::new(self.clone());
let room_member_events_observer =
@@ -3241,6 +3248,21 @@ impl Room {
let mut room_info_stream = self.subscribe_info();
// Spawn a task that will clean up the seen knock request ids when updated room
// members are received
let clear_seen_ids_handle = spawn({
let this = self.clone();
async move {
let mut member_updates_stream = this.room_member_updates_sender.subscribe();
while member_updates_stream.recv().await.is_ok() {
// If room members were updated, try to remove outdated seen knock request ids
if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
warn!("Failed to remove seen knock requests: {err}")
}
}
}
});
let combined_stream = stream! {
// Emit current requests to join
match this.get_current_join_requests(&current_seen_ids).await {
@@ -3315,7 +3337,7 @@ impl Room {
}
};
Ok(combined_stream)
Ok((combined_stream, clear_seen_ids_handle))
}
async fn get_current_join_requests(

View File

@@ -35,6 +35,7 @@ use ruma::{
};
use serde_json::{from_value, json, Value};
use stream_assert::assert_pending;
use tokio::time::sleep;
use wiremock::{
matchers::{body_json, body_partial_json, header, method, path_regex},
Mock, ResponseTemplate,
@@ -840,7 +841,7 @@ async fn test_enable_encryption_doesnt_stay_unencrypted() {
}
#[async_test]
async fn test_subscribe_to_requests_to_join() {
async fn test_subscribe_to_knock_requests() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
@@ -862,7 +863,7 @@ async fn test_subscribe_to_requests_to_join() {
server.mock_get_members().ok(vec![knock_event]).mock_once().mount().await;
let room = server.sync_joined_room(&client, room_id).await;
let stream = room.subscribe_to_knock_requests().await.unwrap();
let (stream, handle) = room.subscribe_to_knock_requests().await.unwrap();
pin_mut!(stream);
@@ -893,16 +894,30 @@ async fn test_subscribe_to_requests_to_join() {
.cast()]);
server.sync_room(&client, joined_room_builder).await;
// The knock requests are now empty
// The knock requests are now empty because we have new member events
let updated_requests = assert_next_with_timeout!(stream, 100);
assert!(updated_requests.is_empty());
// And it's emitted again because the seen id value has changed
let updated_requests = assert_next_with_timeout!(stream, 100);
assert!(updated_requests.is_empty());
// There should be no other knock requests
assert_pending!(stream)
assert_pending!(stream);
// The seen knock request id is no longer there because the associated knock
// request doesn't exist anymore
let seen_knock_request_ids = room
.get_seen_knock_request_ids()
.await
.expect("could not get current seen knock request ids");
assert!(seen_knock_request_ids.is_empty());
handle.abort();
}
#[async_test]
async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() {
async fn test_subscribe_to_knock_requests_reloads_members_on_limited_sync() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
@@ -930,7 +945,7 @@ async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() {
.await;
let room = server.sync_joined_room(&client, room_id).await;
let stream = room.subscribe_to_knock_requests().await.unwrap();
let (stream, handle) = room.subscribe_to_knock_requests().await.unwrap();
pin_mut!(stream);
@@ -946,7 +961,6 @@ async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() {
assert_next_with_timeout!(stream, 500);
// There should be no other knock requests
assert_pending!(stream)
assert_pending!(stream);
handle.abort();
@@ -973,7 +987,9 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_membership_changed()
.cast();
// When syncing the room, we'll have a knock request coming from alice
let room = server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event])).await;
let room = server
.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event]))
.await;
// We then mark the knock request as seen
room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap();
@@ -997,7 +1013,9 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_membership_changed()
room.sync_members().await.expect("could not reload room members");
// Calling remove outdated seen knock request ids will remove the seen id
room.remove_outdated_seen_knock_requests_ids().await.expect("could not remove outdated seen knock request ids");
room.remove_outdated_seen_knock_requests_ids()
.await
.expect("could not remove outdated seen knock request ids");
let seen = room.get_seen_knock_request_ids().await.unwrap();
assert!(seen.is_empty());
@@ -1024,7 +1042,9 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_we_have_an_outdated_k
.cast();
// When syncing the room, we'll have a knock request coming from alice
let room = server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event])).await;
let room = server
.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event]))
.await;
// We then mark the knock request as seen
room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap();
@@ -1033,7 +1053,8 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_we_have_an_outdated_k
let seen = room.get_seen_knock_request_ids().await.unwrap();
assert_eq!(seen.len(), 1);
// If we then load the members again and the previously knocking member has a different event id
// If we then load the members again and the previously knocking member has a
// different event id
let knock_event = f
.event(RoomMemberEventContent::new(MembershipState::Knock))
.event_id(event_id!("$knock-2:b.c"))
@@ -1048,8 +1069,90 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_we_have_an_outdated_k
room.sync_members().await.expect("could not reload room members");
// Calling remove outdated seen knock request ids will remove the seen id
room.remove_outdated_seen_knock_requests_ids().await.expect("could not remove outdated seen knock request ids");
room.remove_outdated_seen_knock_requests_ids()
.await
.expect("could not remove outdated seen knock request ids");
let seen = room.get_seen_knock_request_ids().await.unwrap();
assert!(seen.is_empty());
}
#[async_test]
async fn test_subscribe_to_knock_requests_clears_seen_ids_on_member_reload() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
server.mock_room_state_encryption().plain().mount().await;
let room_id = room_id!("!a:b.c");
let f = EventFactory::new().room(room_id);
let user_id = user_id!("@alice:b.c");
let knock_event_id = event_id!("$alice-knock:b.c");
let knock_event = f
.event(RoomMemberEventContent::new(MembershipState::Knock))
.event_id(knock_event_id)
.sender(user_id)
.state_key(user_id)
.into_raw_timeline()
.cast();
server.mock_get_members().ok(vec![knock_event]).mock_once().mount().await;
let room = server.sync_joined_room(&client, room_id).await;
let (stream, handle) = room.subscribe_to_knock_requests().await.unwrap();
pin_mut!(stream);
// We receive an initial knock request from Alice
let initial = assert_next_with_timeout!(stream, 100);
assert_eq!(initial.len(), 1);
let knock_request = &initial[0];
assert_eq!(knock_request.event_id, knock_event_id);
assert!(!knock_request.is_seen);
// We then mark the knock request as seen
room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap();
// Now it's received again as seen
let seen = assert_next_with_timeout!(stream, 100);
assert_eq!(seen.len(), 1);
let seen_knock = &seen[0];
assert_eq!(seen_knock.event_id, knock_event_id);
assert!(seen_knock.is_seen);
// If we then load the members again and the previously knocking member is in
// another state now
let joined_event = f
.event(RoomMemberEventContent::new(MembershipState::Join))
.sender(user_id)
.state_key(user_id)
.into_raw_timeline()
.cast();
server.mock_get_members().ok(vec![joined_event]).mock_once().mount().await;
room.mark_members_missing();
room.sync_members().await.expect("could not reload room members");
// The knock requests are now empty because we have new member events
let updated_requests = assert_next_with_timeout!(stream, 100);
assert!(updated_requests.is_empty());
// There should be no other knock requests
assert_pending!(stream);
// Give some time for the seen ids purging to be done
sleep(Duration::from_millis(100)).await;
// The seen knock request id is no longer there because the associated knock
// request doesn't exist anymore
let seen_knock_request_ids = room
.get_seen_knock_request_ids()
.await
.expect("could not get current seen knock request ids");
assert!(seen_knock_request_ids.is_empty());
handle.abort();
}