From cb14813f847a153140233b19be81be00151ca8d2 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 26 Sep 2023 11:35:39 +0200 Subject: [PATCH] chore: address review comments --- crates/matrix-sdk-base/src/client.rs | 2 +- crates/matrix-sdk/CHANGELOG.md | 3 + crates/matrix-sdk/src/client/mod.rs | 8 +- .../matrix-sdk/src/deduplicating_handler.rs | 83 +++++++++++++++++++ crates/matrix-sdk/src/lib.rs | 1 + crates/matrix-sdk/src/room/mod.rs | 65 +-------------- 6 files changed, 95 insertions(+), 67 deletions(-) create mode 100644 crates/matrix-sdk/src/deduplicating_handler.rs diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 42dcf4613..4fd7bb582 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -978,7 +978,7 @@ impl BaseClient { // TODO: All the actions in this loop used to be done only when the membership // event was not in the store before. This was changed with the new room API, // because e.g. leaving a room makes members events outdated and they need to be - // fetched by `get_members`. Therefore, they need to be overwritten here, even + // fetched by `members`. Therefore, they need to be overwritten here, even // if they exist. // However, this makes a new problem occur where setting the member events here // potentially races with the sync. diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 923736501..c90671be3 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -23,6 +23,9 @@ Breaking changes: - Event handler closures now need to implement `FnOnce` + `Clone` instead of `Fn` - As a consequence, you no longer need to explicitly need to `clone` variables they capture before constructing an `async move {}` block inside +- `Room::sync_members` doesn't return the underlying Ruma response anymore. If you need to get the + room members, you can use `Room::members` or `Room::get_member` which will make sure that the + members are up to date. Bug fixes: diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index dbcbe8fda..15676f069 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -76,6 +76,7 @@ use crate::oidc::Oidc; use crate::{ authentication::{AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback}, config::RequestConfig, + deduplicating_handler::DeduplicatingHandler, error::{HttpError, HttpResult}, event_handler::{ EventHandler, EventHandlerDropGuard, EventHandlerHandle, EventHandlerStore, SyncEvent, @@ -83,7 +84,6 @@ use crate::{ http_client::HttpClient, matrix_auth::MatrixAuth, notification_settings::NotificationSettings, - room::DeduplicatedRequestHandler, sync::{RoomUpdate, SyncResponse}, Account, AuthApi, AuthSession, Error, Media, RefreshTokenError, Result, Room, TransmissionProgress, @@ -162,16 +162,16 @@ pub(crate) struct ClientInner { /// Handler making sure we only have one group session sharing request in /// flight per room. #[cfg(feature = "e2e-encryption")] - pub(crate) group_session_deduplicated_handler: DeduplicatedRequestHandler, + pub(crate) group_session_deduplicated_handler: DeduplicatingHandler, /// Lock making sure we're only doing one key claim request at a time. #[cfg(feature = "e2e-encryption")] pub(crate) key_claim_lock: Mutex<()>, /// Handler to ensure that only one members request is running at a time, /// given a room. - pub(crate) members_request_deduplicated_handler: DeduplicatedRequestHandler, + pub(crate) members_request_deduplicated_handler: DeduplicatingHandler, /// Handler to ensure that only one encryption state request is running at a /// time, given a room. - pub(crate) encryption_state_deduplicated_handler: DeduplicatedRequestHandler, + pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler, pub(crate) typing_notice_times: DashMap, /// Event handlers. See `add_event_handler`. pub(crate) event_handlers: EventHandlerStore, diff --git a/crates/matrix-sdk/src/deduplicating_handler.rs b/crates/matrix-sdk/src/deduplicating_handler.rs new file mode 100644 index 000000000..994a18bdc --- /dev/null +++ b/crates/matrix-sdk/src/deduplicating_handler.rs @@ -0,0 +1,83 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::BTreeMap, sync::Arc}; + +use futures_core::Future; +use matrix_sdk_common::SendOutsideWasm; +use tokio::sync::Mutex; + +use crate::{Error, Result}; + +type DeduplicatedRequestMap = Mutex>>>>; + +/// Handler that properly deduplicates function calls given a key uniquely +/// identifying the call kind, and will properly report error upwards in case +/// the concurrent call failed. +/// +/// This is handy for deduplicating per-room requests, but can also be used in +/// other contexts. +pub(crate) struct DeduplicatingHandler { + inflight: DeduplicatedRequestMap, +} + +impl Default for DeduplicatingHandler { + fn default() -> Self { + Self { inflight: Default::default() } + } +} + +impl DeduplicatingHandler { + pub async fn run<'a, F: Future> + SendOutsideWasm + 'a>( + &self, + key: Key, + code: F, + ) -> Result<()> { + let mut map = self.inflight.lock().await; + + if let Some(mutex) = map.get(&key).cloned() { + // If a request is already going on, await the release of the lock. + drop(map); + + return mutex.lock().await.map_err(|()| Error::ConcurrentRequestFailed); + } + + // Assume a successful request; we'll modify the result in case of failures + // later. + let request_mutex = Arc::new(Mutex::new(Ok(()))); + + map.insert(key.clone(), request_mutex.clone()); + + let mut request_guard = request_mutex.lock().await; + drop(map); + + match code.await { + Ok(()) => { + self.inflight.lock().await.remove(&key); + Ok(()) + } + + Err(err) => { + // Propagate the error state to other callers. + *request_guard = Err(()); + + // Remove the request from the in-flights set. + self.inflight.lock().await.remove(&key); + + // Bubble up the error. + Err(err) + } + } + } +} diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index fb0dca2ee..eb344f546 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -34,6 +34,7 @@ pub mod attachment; mod authentication; mod client; pub mod config; +mod deduplicating_handler; #[cfg(feature = "e2e-encryption")] pub mod encryption; mod error; diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index ec977e39f..604808788 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -1,9 +1,8 @@ //! High-level room API -use std::{borrow::Borrow, collections::BTreeMap, ops::Deref, sync::Arc, time::Duration}; +use std::{borrow::Borrow, collections::BTreeMap, ops::Deref, time::Duration}; use eyeball::SharedObservable; -use futures_core::Future; use matrix_sdk_base::{ deserialized_responses::{ RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState, TimelineEvent, @@ -12,7 +11,7 @@ use matrix_sdk_base::{ store::StateStoreExt, RoomMemberships, StateChanges, }; -use matrix_sdk_common::{timeout::timeout, SendOutsideWasm}; +use matrix_sdk_common::timeout::timeout; use mime::Mime; #[cfg(feature = "e2e-encryption")] use ruma::events::{ @@ -67,7 +66,7 @@ use ruma::{ }; use serde::de::DeserializeOwned; use thiserror::Error; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::broadcast; use tracing::{debug, instrument, warn}; use crate::{ @@ -90,64 +89,6 @@ pub use self::{ messages::{Messages, MessagesOptions}, }; -type DeduplicatedRequestMap = Mutex>>>>; - -/// Handler that properly deduplicates requests to the same endpoint, and will -/// properly report error upwards in case the concurrent request failed. -pub(crate) struct DeduplicatedRequestHandler { - inflight: DeduplicatedRequestMap, -} - -impl Default for DeduplicatedRequestHandler { - fn default() -> Self { - Self { inflight: Default::default() } - } -} - -impl DeduplicatedRequestHandler { - async fn run<'a, F: Future> + SendOutsideWasm + 'a>( - &self, - key: Key, - code: F, - ) -> Result<()> { - let mut map = self.inflight.lock().await; - - if let Some(mutex) = map.get(&key).cloned() { - // If a request is already going on, await the release of the lock. - drop(map); - - return mutex.lock().await.map_err(|()| Error::ConcurrentRequestFailed); - } - - // Assume a successful request; we'll modify the result in case of failures - // later. - let request_mutex = Arc::new(Mutex::new(Ok(()))); - - map.insert(key.clone(), request_mutex.clone()); - - let mut request_guard = request_mutex.lock().await; - drop(map); - - match code.await { - Ok(()) => { - self.inflight.lock().await.remove(&key); - Ok(()) - } - - Err(err) => { - // Propagate the error state to other callers. - *request_guard = Err(()); - - // Remove the request from the in-flights set. - self.inflight.lock().await.remove(&key); - - // Bubble up the error. - Err(err) - } - } - } -} - /// A struct containing methods that are common for Joined, Invited and Left /// Rooms #[derive(Debug, Clone)]