From 5e0704a7c7718ec3ffe2a29a399dbde4fa56ff3e Mon Sep 17 00:00:00 2001 From: aeoncl Date: Sat, 24 May 2025 10:38:07 +0200 Subject: [PATCH] fix(sliding sync): don't take the sync lock while handling events Fixes #5091. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 101 ++++++++++++++++++---- 1 file changed, 83 insertions(+), 18 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index e60978324..fd2fce5dd 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -253,26 +253,31 @@ impl SlidingSync { // happens here. let sync_response = { - // Take the lock to avoid concurrent sliding syncs overwriting each other's room - // infos. - let _sync_lock = self.inner.client.base_client().sync_lock().lock().await; + let response_processor = { + // Take the lock to avoid concurrent sliding syncs overwriting each other's room + // infos. + let _sync_lock = self.inner.client.base_client().sync_lock().lock().await; - let mut response_processor = - SlidingSyncResponseProcessor::new(self.inner.client.clone()); + let mut response_processor = + SlidingSyncResponseProcessor::new(self.inner.client.clone()); - #[cfg(feature = "e2e-encryption")] - if self.is_e2ee_enabled() { - response_processor.handle_encryption(&sliding_sync_response.extensions).await? - } + #[cfg(feature = "e2e-encryption")] + if self.is_e2ee_enabled() { + response_processor.handle_encryption(&sliding_sync_response.extensions).await? + } + + // Only handle the room's subsection of the response, if this sliding sync was + // configured to do so. + if must_process_rooms_response { + response_processor + .handle_room_response(&sliding_sync_response, &requested_required_states) + .await?; + } - // Only handle the room's subsection of the response, if this sliding sync was - // configured to do so. - if must_process_rooms_response { response_processor - .handle_room_response(&sliding_sync_response, &requested_required_states) - .await?; - } + }; + // Release the lock before calling event handlers response_processor.process_and_take_response().await? }; @@ -895,11 +900,12 @@ mod tests { use matrix_sdk_base::RequestedRequiredStates; use matrix_sdk_test::async_test; use ruma::{ - api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint, - OwnedRoomId, TransactionId, + api::client::error::ErrorKind, assign, events::direct::DirectEvent, owned_room_id, room_id, + serde::Raw, uint, OwnedRoomId, TransactionId, }; use serde::Deserialize; use serde_json::json; + use tokio::sync::Barrier; use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate}; use super::{ @@ -909,7 +915,8 @@ mod tests { SlidingSyncStickyParameters, }; use crate::{ - sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result, + sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Client, + Result, }; #[derive(Copy, Clone)] @@ -2764,4 +2771,62 @@ mod tests { Ok(()) } + + #[async_test] + async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> { + let server = MockServer::start().await; + let client = logged_in_client(Some(server.uri())).await; + + // Create a barrier to wait for event handler execution + let barrier = Arc::new(Barrier::new(2)); + + let _sync_mock_guard = Mock::given(SlidingSyncMatcher) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "pos": "0", + "lists": {}, + "extensions": { + "account_data": { + "global": [ + { + "type": "m.direct", + "content": { + "@de4dlockh0lmes:example.org": [ + "!mu5hr00m:example.org" + ] + } + } + ] + } + } + }))) + .mount_as_scoped(&server) + .await; + + let barrier_clone = barrier.clone(); + client.add_event_handler(|_: DirectEvent, client: Client| async move { + //Check if the sync lock is free during handler execution + let sync_lock = client.base_client().sync_lock().try_lock(); + assert!(sync_lock.is_ok(), "sync_lock wasn't free during handler execution"); + barrier_clone.wait().await; + }); + + let sliding_sync = client + .sliding_sync("test")? + .add_list(SlidingSyncList::builder("thelist")) + .with_account_data_extension( + assign!(http::request::AccountData::default(), { enabled: Some(true) }), + ) + .build() + .await?; + + let (sync, handler_execution) = tokio::join!( + sliding_sync.sync_once(), + tokio::time::timeout(Duration::from_secs(5), barrier.wait()) + ); + + assert!(sync.is_ok(), "Sync failed"); + assert!(handler_execution.is_ok(), "Handlers did not execute before timeout"); + + Ok(()) + } }