From ad63d28cfadcd003755de9216cb539c801919824 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 11 Jun 2024 16:57:25 +0200 Subject: [PATCH] deduplicating handler: use a new `QueryState` data structure --- .../matrix-sdk/src/deduplicating_handler.rs | 41 ++++++++++++++----- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/crates/matrix-sdk/src/deduplicating_handler.rs b/crates/matrix-sdk/src/deduplicating_handler.rs index cf8510b8c..db0afbbb8 100644 --- a/crates/matrix-sdk/src/deduplicating_handler.rs +++ b/crates/matrix-sdk/src/deduplicating_handler.rs @@ -24,7 +24,20 @@ use tokio::sync::Mutex; use crate::{Error, Result}; -type DeduplicatedRequestMap = Mutex>>>>>; +/// State machine for the state of a query deduplicated by the +/// [`DeduplicatingHandler`]. +enum QueryState { + /// The query hasn't completed yet. This doesn't mean it hasn't *started* + /// yet, but rather that it couldn't get to completion: some + /// intermediate steps might have run. + NotFinishedYet, + /// The query has completed with an `Ok` result. + Success, + /// The query has completed with an `Err` result. + Failure, +} + +type DeduplicatedRequestMap = Mutex>>>; /// Handler that properly deduplicates function calls given a key uniquely /// identifying the call kind, and will properly report error upwards in case @@ -62,45 +75,51 @@ impl DeduplicatingHandler { let mut request_guard = request_mutex.lock().await; return match *request_guard { - Some(Ok(())) => { + QueryState::Success => { // The query completed with a success: forward this success. Ok(()) } - Some(Err(())) => { + QueryState::Failure => { // The query completed with an error, but we don't know what it is; report // there was an error. Err(Error::ConcurrentRequestFailed) } - None => { - // The query hasn't completed, it could have been cancelled. Repeat it. - self.run_code(key, code, &mut *request_guard).await + QueryState::NotFinishedYet => { + // If we could take a hold onto the mutex without it being in the success or + // failure state, then the query hasn't completed (e.g. it could have been + // cancelled). Repeat it. + // + // Note: there might be other waiters for the deduplicated result; they will + // still be waiting for the mutex above, since the mutex is obtained for at + // most one holder at the same time. + self.run_code(key, code, &mut request_guard).await } }; } // Start at the `None` state to indicate we haven't completed the request yet. - let request_mutex = Arc::new(Mutex::new(None)); + let request_mutex = Arc::new(Mutex::new(QueryState::NotFinishedYet)); map.insert(key.clone(), request_mutex.clone()); let mut request_guard = request_mutex.lock().await; drop(map); - self.run_code(key, code, &mut *request_guard).await + self.run_code(key, code, &mut request_guard).await } async fn run_code<'a, F: Future> + SendOutsideWasm + 'a>( &self, key: Key, code: F, - result: &mut Option>, + result: &mut QueryState, ) -> Result<()> { match code.await { Ok(()) => { // Mark the request as completed. - *result = Some(Ok(())); + *result = QueryState::Success; self.inflight.lock().await.remove(&key); @@ -109,7 +128,7 @@ impl DeduplicatingHandler { Err(err) => { // Propagate the error state to other callers. - *result = Some(Err(())); + *result = QueryState::Failure; // Remove the request from the in-flights set. self.inflight.lock().await.remove(&key);