chore: address review comments

This commit is contained in:
Benjamin Bouvier
2023-09-26 11:35:39 +02:00
parent e9362f75f2
commit cb14813f84
6 changed files with 95 additions and 67 deletions

View File

@@ -978,7 +978,7 @@ impl BaseClient {
// TODO: All the actions in this loop used to be done only when the membership
// event was not in the store before. This was changed with the new room API,
// because e.g. leaving a room makes members events outdated and they need to be
// fetched by `get_members`. Therefore, they need to be overwritten here, even
// fetched by `members`. Therefore, they need to be overwritten here, even
// if they exist.
// However, this makes a new problem occur where setting the member events here
// potentially races with the sync.

View File

@@ -23,6 +23,9 @@ Breaking changes:
- Event handler closures now need to implement `FnOnce` + `Clone` instead of `Fn`
- As a consequence, you no longer need to explicitly need to `clone` variables they capture
before constructing an `async move {}` block inside
- `Room::sync_members` doesn't return the underlying Ruma response anymore. If you need to get the
room members, you can use `Room::members` or `Room::get_member` which will make sure that the
members are up to date.
Bug fixes:

View File

@@ -76,6 +76,7 @@ use crate::oidc::Oidc;
use crate::{
authentication::{AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback},
config::RequestConfig,
deduplicating_handler::DeduplicatingHandler,
error::{HttpError, HttpResult},
event_handler::{
EventHandler, EventHandlerDropGuard, EventHandlerHandle, EventHandlerStore, SyncEvent,
@@ -83,7 +84,6 @@ use crate::{
http_client::HttpClient,
matrix_auth::MatrixAuth,
notification_settings::NotificationSettings,
room::DeduplicatedRequestHandler,
sync::{RoomUpdate, SyncResponse},
Account, AuthApi, AuthSession, Error, Media, RefreshTokenError, Result, Room,
TransmissionProgress,
@@ -162,16 +162,16 @@ pub(crate) struct ClientInner {
/// Handler making sure we only have one group session sharing request in
/// flight per room.
#[cfg(feature = "e2e-encryption")]
pub(crate) group_session_deduplicated_handler: DeduplicatedRequestHandler<OwnedRoomId>,
pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
/// Lock making sure we're only doing one key claim request at a time.
#[cfg(feature = "e2e-encryption")]
pub(crate) key_claim_lock: Mutex<()>,
/// Handler to ensure that only one members request is running at a time,
/// given a room.
pub(crate) members_request_deduplicated_handler: DeduplicatedRequestHandler<OwnedRoomId>,
pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
/// Handler to ensure that only one encryption state request is running at a
/// time, given a room.
pub(crate) encryption_state_deduplicated_handler: DeduplicatedRequestHandler<OwnedRoomId>,
pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
pub(crate) typing_notice_times: DashMap<OwnedRoomId, Instant>,
/// Event handlers. See `add_event_handler`.
pub(crate) event_handlers: EventHandlerStore,

View File

@@ -0,0 +1,83 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::BTreeMap, sync::Arc};
use futures_core::Future;
use matrix_sdk_common::SendOutsideWasm;
use tokio::sync::Mutex;
use crate::{Error, Result};
type DeduplicatedRequestMap<Key> = Mutex<BTreeMap<Key, Arc<Mutex<Result<(), ()>>>>>;
/// Handler that properly deduplicates function calls given a key uniquely
/// identifying the call kind, and will properly report error upwards in case
/// the concurrent call failed.
///
/// This is handy for deduplicating per-room requests, but can also be used in
/// other contexts.
pub(crate) struct DeduplicatingHandler<Key> {
inflight: DeduplicatedRequestMap<Key>,
}
impl<Key> Default for DeduplicatingHandler<Key> {
fn default() -> Self {
Self { inflight: Default::default() }
}
}
impl<Key: Clone + Ord + std::hash::Hash> DeduplicatingHandler<Key> {
pub async fn run<'a, F: Future<Output = Result<()>> + SendOutsideWasm + 'a>(
&self,
key: Key,
code: F,
) -> Result<()> {
let mut map = self.inflight.lock().await;
if let Some(mutex) = map.get(&key).cloned() {
// If a request is already going on, await the release of the lock.
drop(map);
return mutex.lock().await.map_err(|()| Error::ConcurrentRequestFailed);
}
// Assume a successful request; we'll modify the result in case of failures
// later.
let request_mutex = Arc::new(Mutex::new(Ok(())));
map.insert(key.clone(), request_mutex.clone());
let mut request_guard = request_mutex.lock().await;
drop(map);
match code.await {
Ok(()) => {
self.inflight.lock().await.remove(&key);
Ok(())
}
Err(err) => {
// Propagate the error state to other callers.
*request_guard = Err(());
// Remove the request from the in-flights set.
self.inflight.lock().await.remove(&key);
// Bubble up the error.
Err(err)
}
}
}
}

View File

@@ -34,6 +34,7 @@ pub mod attachment;
mod authentication;
mod client;
pub mod config;
mod deduplicating_handler;
#[cfg(feature = "e2e-encryption")]
pub mod encryption;
mod error;

View File

@@ -1,9 +1,8 @@
//! High-level room API
use std::{borrow::Borrow, collections::BTreeMap, ops::Deref, sync::Arc, time::Duration};
use std::{borrow::Borrow, collections::BTreeMap, ops::Deref, time::Duration};
use eyeball::SharedObservable;
use futures_core::Future;
use matrix_sdk_base::{
deserialized_responses::{
RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState, TimelineEvent,
@@ -12,7 +11,7 @@ use matrix_sdk_base::{
store::StateStoreExt,
RoomMemberships, StateChanges,
};
use matrix_sdk_common::{timeout::timeout, SendOutsideWasm};
use matrix_sdk_common::timeout::timeout;
use mime::Mime;
#[cfg(feature = "e2e-encryption")]
use ruma::events::{
@@ -67,7 +66,7 @@ use ruma::{
};
use serde::de::DeserializeOwned;
use thiserror::Error;
use tokio::sync::{broadcast, Mutex};
use tokio::sync::broadcast;
use tracing::{debug, instrument, warn};
use crate::{
@@ -90,64 +89,6 @@ pub use self::{
messages::{Messages, MessagesOptions},
};
type DeduplicatedRequestMap<Key> = Mutex<BTreeMap<Key, Arc<Mutex<Result<(), ()>>>>>;
/// Handler that properly deduplicates requests to the same endpoint, and will
/// properly report error upwards in case the concurrent request failed.
pub(crate) struct DeduplicatedRequestHandler<Key> {
inflight: DeduplicatedRequestMap<Key>,
}
impl<Key> Default for DeduplicatedRequestHandler<Key> {
fn default() -> Self {
Self { inflight: Default::default() }
}
}
impl<Key: Clone + Ord + std::hash::Hash> DeduplicatedRequestHandler<Key> {
async fn run<'a, F: Future<Output = Result<()>> + SendOutsideWasm + 'a>(
&self,
key: Key,
code: F,
) -> Result<()> {
let mut map = self.inflight.lock().await;
if let Some(mutex) = map.get(&key).cloned() {
// If a request is already going on, await the release of the lock.
drop(map);
return mutex.lock().await.map_err(|()| Error::ConcurrentRequestFailed);
}
// Assume a successful request; we'll modify the result in case of failures
// later.
let request_mutex = Arc::new(Mutex::new(Ok(())));
map.insert(key.clone(), request_mutex.clone());
let mut request_guard = request_mutex.lock().await;
drop(map);
match code.await {
Ok(()) => {
self.inflight.lock().await.remove(&key);
Ok(())
}
Err(err) => {
// Propagate the error state to other callers.
*request_guard = Err(());
// Remove the request from the in-flights set.
self.inflight.lock().await.remove(&key);
// Bubble up the error.
Err(err)
}
}
}
}
/// A struct containing methods that are common for Joined, Invited and Left
/// Rooms
#[derive(Debug, Clone)]