mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-07 15:33:45 -04:00
fix(sliding sync): don't take the sync lock while handling events
Fixes #5091.
This commit is contained in:
@@ -253,26 +253,31 @@ impl SlidingSync {
|
||||
// happens here.
|
||||
|
||||
let sync_response = {
|
||||
// Take the lock to avoid concurrent sliding syncs overwriting each other's room
|
||||
// infos.
|
||||
let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
|
||||
let response_processor = {
|
||||
// Take the lock to avoid concurrent sliding syncs overwriting each other's room
|
||||
// infos.
|
||||
let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
|
||||
|
||||
let mut response_processor =
|
||||
SlidingSyncResponseProcessor::new(self.inner.client.clone());
|
||||
let mut response_processor =
|
||||
SlidingSyncResponseProcessor::new(self.inner.client.clone());
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
if self.is_e2ee_enabled() {
|
||||
response_processor.handle_encryption(&sliding_sync_response.extensions).await?
|
||||
}
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
if self.is_e2ee_enabled() {
|
||||
response_processor.handle_encryption(&sliding_sync_response.extensions).await?
|
||||
}
|
||||
|
||||
// Only handle the room's subsection of the response, if this sliding sync was
|
||||
// configured to do so.
|
||||
if must_process_rooms_response {
|
||||
response_processor
|
||||
.handle_room_response(&sliding_sync_response, &requested_required_states)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Only handle the room's subsection of the response, if this sliding sync was
|
||||
// configured to do so.
|
||||
if must_process_rooms_response {
|
||||
response_processor
|
||||
.handle_room_response(&sliding_sync_response, &requested_required_states)
|
||||
.await?;
|
||||
}
|
||||
};
|
||||
|
||||
// Release the lock before calling event handlers
|
||||
response_processor.process_and_take_response().await?
|
||||
};
|
||||
|
||||
@@ -895,11 +900,12 @@ mod tests {
|
||||
use matrix_sdk_base::RequestedRequiredStates;
|
||||
use matrix_sdk_test::async_test;
|
||||
use ruma::{
|
||||
api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
|
||||
OwnedRoomId, TransactionId,
|
||||
api::client::error::ErrorKind, assign, events::direct::DirectEvent, owned_room_id, room_id,
|
||||
serde::Raw, uint, OwnedRoomId, TransactionId,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use serde_json::json;
|
||||
use tokio::sync::Barrier;
|
||||
use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
|
||||
|
||||
use super::{
|
||||
@@ -909,7 +915,8 @@ mod tests {
|
||||
SlidingSyncStickyParameters,
|
||||
};
|
||||
use crate::{
|
||||
sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
|
||||
sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Client,
|
||||
Result,
|
||||
};
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
@@ -2764,4 +2771,62 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> {
|
||||
let server = MockServer::start().await;
|
||||
let client = logged_in_client(Some(server.uri())).await;
|
||||
|
||||
// Create a barrier to wait for event handler execution
|
||||
let barrier = Arc::new(Barrier::new(2));
|
||||
|
||||
let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"pos": "0",
|
||||
"lists": {},
|
||||
"extensions": {
|
||||
"account_data": {
|
||||
"global": [
|
||||
{
|
||||
"type": "m.direct",
|
||||
"content": {
|
||||
"@de4dlockh0lmes:example.org": [
|
||||
"!mu5hr00m:example.org"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
})))
|
||||
.mount_as_scoped(&server)
|
||||
.await;
|
||||
|
||||
let barrier_clone = barrier.clone();
|
||||
client.add_event_handler(|_: DirectEvent, client: Client| async move {
|
||||
//Check if the sync lock is free during handler execution
|
||||
let sync_lock = client.base_client().sync_lock().try_lock();
|
||||
assert!(sync_lock.is_ok(), "sync_lock wasn't free during handler execution");
|
||||
barrier_clone.wait().await;
|
||||
});
|
||||
|
||||
let sliding_sync = client
|
||||
.sliding_sync("test")?
|
||||
.add_list(SlidingSyncList::builder("thelist"))
|
||||
.with_account_data_extension(
|
||||
assign!(http::request::AccountData::default(), { enabled: Some(true) }),
|
||||
)
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
let (sync, handler_execution) = tokio::join!(
|
||||
sliding_sync.sync_once(),
|
||||
tokio::time::timeout(Duration::from_secs(5), barrier.wait())
|
||||
);
|
||||
|
||||
assert!(sync.is_ok(), "Sync failed");
|
||||
assert!(handler_execution.is_ok(), "Handlers did not execute before timeout");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user