diff --git a/crates/matrix-sdk-crypto/src/machine.rs b/crates/matrix-sdk-crypto/src/machine.rs index 0f558b95d..f597863b0 100644 --- a/crates/matrix-sdk-crypto/src/machine.rs +++ b/crates/matrix-sdk-crypto/src/machine.rs @@ -1927,6 +1927,8 @@ impl OlmMachine { None => 0, }; + tracing::debug!("Initialising crypto store generation at {}", gen); + self.inner .store .set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, gen.to_le_bytes().to_vec()) @@ -1939,19 +1941,32 @@ impl OlmMachine { /// If needs be, update the local and on-disk crypto store generation. /// - /// Returns true whether another user has modified the internal generation - /// counter, and as such we've incremented and updated it in the - /// database. - /// /// ## Requirements /// /// - This assumes that `initialize_crypto_store_generation` has been called /// beforehand. /// - This requires that the crypto store lock has been acquired. - pub async fn maintain_crypto_store_generation( - &self, + /// + /// # Arguments + /// + /// * `generation` - The in-memory generation counter (or rather, the + /// `Mutex` wrapping it). This defines the "expected" generation on entry, + /// and, if we determine an update is needed, is updated to hold the "new" + /// generation. + /// + /// # Returns + /// + /// A tuple containing: + /// + /// * A `bool`, set to `true` if another process has updated the generation + /// number in the `Store` since our expected value, and as such we've + /// incremented and updated it in the database. Otherwise, `false`. + /// + /// * The (possibly updated) generation counter. + pub async fn maintain_crypto_store_generation<'a>( + &'a self, generation: &Mutex>, - ) -> StoreResult { + ) -> StoreResult<(bool, u64)> { let mut gen_guard = generation.lock().await; // The database value must be there: @@ -1973,10 +1988,10 @@ impl OlmMachine { CryptoStoreError::InvalidLockGeneration("invalid format".to_owned()) })?); - let expected_gen = match gen_guard.as_ref() { + let new_gen = match gen_guard.as_ref() { Some(expected_gen) => { if actual_gen == *expected_gen { - return Ok(false); + return Ok((false, actual_gen)); } // Increment the biggest, and store it everywhere. actual_gen.max(*expected_gen).wrapping_add(1) @@ -1992,22 +2007,19 @@ impl OlmMachine { "Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}", *gen_guard, actual_gen, - expected_gen + new_gen ); // Update known value. - *gen_guard = Some(expected_gen); + *gen_guard = Some(new_gen); // Update value in database. self.inner .store - .set_custom_value( - Self::CURRENT_GENERATION_STORE_KEY, - expected_gen.to_le_bytes().to_vec(), - ) + .set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, new_gen.to_le_bytes().to_vec()) .await?; - Ok(true) + Ok((true, new_gen)) } /// Manage dehydrated devices. diff --git a/crates/matrix-sdk-ui/src/encryption_sync_service.rs b/crates/matrix-sdk-ui/src/encryption_sync_service.rs index 56e2051d6..c4ca039f0 100644 --- a/crates/matrix-sdk-ui/src/encryption_sync_service.rs +++ b/crates/matrix-sdk-ui/src/encryption_sync_service.rs @@ -26,7 +26,7 @@ //! //! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension -use std::time::Duration; +use std::{pin::Pin, time::Duration}; use async_stream::stream; use futures_core::stream::Stream; @@ -34,7 +34,7 @@ use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{Client, SlidingSync, LEASE_DURATION_MS}; use ruma::{api::client::sync::sync_events::v4, assign}; use tokio::sync::OwnedMutexGuard; -use tracing::{debug, trace}; +use tracing::{debug, instrument, trace, Span}; /// Unit type representing a permit to *use* an [`EncryptionSyncService`]. /// @@ -143,6 +143,7 @@ impl EncryptionSyncService { /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at /// most one encryption sync running at any time. See its documentation /// for more details. + #[instrument(skip_all, fields(store_generation))] pub async fn run_fixed_iterations( self, num_iterations: u8, @@ -152,7 +153,7 @@ impl EncryptionSyncService { pin_mut!(sync); - let _lock_guard = if self.with_locking { + let lock_guard = if self.with_locking { let mut lock_guard = self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?; @@ -192,6 +193,8 @@ impl EncryptionSyncService { None }; + Span::current().record("store_generation", lock_guard.map(|guard| guard.generation())); + for _ in 0..num_iterations { match sync.next().await { Some(Ok(update_summary)) => { @@ -241,17 +244,7 @@ impl EncryptionSyncService { pin_mut!(sync); loop { - let guard = if self.with_locking { - self.client - .encryption() - .spin_lock_store(Some(60000)) - .await - .map_err(Error::LockError)? - } else { - None - }; - - match sync.next().await { + match self.next_sync_with_lock(&mut sync).await? { Some(Ok(update_summary)) => { // This API is only concerned with the e2ee and to-device extensions. // Warn if anything weird has been received from the proxy. @@ -264,18 +257,12 @@ impl EncryptionSyncService { // Cool cool, let's do it again. trace!("Encryption sync received an update!"); - - drop(guard); - yield Ok(()); continue; } Some(Err(err)) => { trace!("Encryption sync stopped because of an error: {err:#}"); - - drop(guard); - yield Err(Error::SlidingSync(err)); break; } @@ -289,6 +276,24 @@ impl EncryptionSyncService { }) } + /// Helper function for `sync`. Take the cross-process store lock, and call + /// `sync.next()` + #[instrument(skip_all, fields(store_generation))] + async fn next_sync_with_lock( + &self, + sync: &mut Pin<&mut impl Stream>, + ) -> Result, Error> { + let guard = if self.with_locking { + self.client.encryption().spin_lock_store(Some(60000)).await.map_err(Error::LockError)? + } else { + None + }; + + Span::current().record("store_generation", guard.map(|guard| guard.generation())); + + Ok(sync.next().await) + } + /// Requests that the underlying sliding sync be stopped. /// /// This will unlock the cross-process lock, if taken. diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index e14251d39..5ce646359 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -201,6 +201,20 @@ pub enum VerificationState { Unverified, } +/// Wraps together a `CrossProcessLockStoreGuard` and a generation number. +#[derive(Debug)] +pub struct CrossProcessLockStoreGuardWithGeneration { + _guard: CrossProcessStoreLockGuard, + generation: u64, +} + +impl CrossProcessLockStoreGuardWithGeneration { + /// Return the Crypto Store generation associated with this store lock. + pub fn generation(&self) -> u64 { + self.generation + } +} + impl Client { pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option> { self.base_client().olm_machine().await @@ -1247,21 +1261,30 @@ impl Encryption { /// Maybe reload the `OlmMachine` after acquiring the lock for the first /// time. - async fn on_lock_newly_acquired(&self) -> Result<(), Error> { + /// + /// Returns the current generation number. + async fn on_lock_newly_acquired(&self) -> Result { let olm_machine_guard = self.client.olm_machine().await; if let Some(olm_machine) = olm_machine_guard.as_ref() { - // If the crypto store generation has changed, - if olm_machine + let (new_gen, generation_number) = olm_machine .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation) - .await? - { + .await?; + // If the crypto store generation has changed, + if new_gen { // (get rid of the reference to the current crypto store first) drop(olm_machine_guard); // Recreate the OlmMachine. self.client.base_client().regenerate_olm().await?; } + Ok(generation_number) + } else { + // XXX: not sure this is reachable. Seems like the OlmMachine should always have + // been initialised by the time we get here. Ideally we'd panic, or return an + // error, but for now I'm just adding some logging to check if it + // happens, and returning the magic number 0. + warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised"); + Ok(0) } - Ok(()) } /// If a lock was created with [`Self::enable_cross_process_store_lock`], @@ -1272,13 +1295,13 @@ impl Encryption { pub async fn spin_lock_store( &self, max_backoff: Option, - ) -> Result, Error> { + ) -> Result, Error> { if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() { let guard = lock.spin_lock(max_backoff).await?; - self.on_lock_newly_acquired().await?; + let generation = self.on_lock_newly_acquired().await?; - Ok(Some(guard)) + Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation })) } else { Ok(None) } @@ -1288,15 +1311,19 @@ impl Encryption { /// attempts to lock it once. /// /// Returns a guard to the lock, if it was obtained. - pub async fn try_lock_store_once(&self) -> Result, Error> { + pub async fn try_lock_store_once( + &self, + ) -> Result, Error> { if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() { let maybe_guard = lock.try_lock_once().await?; - if maybe_guard.is_some() { - self.on_lock_newly_acquired().await?; - } + let Some(guard) = maybe_guard else { + return Ok(None); + }; - Ok(maybe_guard) + let generation = self.on_lock_newly_acquired().await?; + + Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation })) } else { Ok(None) } diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index bc9fa0577..d3bdafe93 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -1448,12 +1448,13 @@ impl Room { // TODO: expose this publicly so people can pre-share a group session if // e.g. a user starts to type a message for a room. #[cfg(feature = "e2e-encryption")] - #[instrument(skip_all, fields(room_id = ?self.room_id()))] + #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))] async fn preshare_room_key(&self) -> Result<()> { self.ensure_room_joined()?; // Take and release the lock on the store, if needs be. - let _guard = self.client.encryption().spin_lock_store(Some(60000)).await?; + let guard = self.client.encryption().spin_lock_store(Some(60000)).await?; + tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation())); self.client .locks() diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 9ffe486e3..438fff0c7 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -722,14 +722,11 @@ impl SlidingSync { pub fn sync(&self) -> impl Stream> + '_ { debug!("Starting sync stream"); - let sync_span = Span::current(); let mut internal_channel_receiver = self.inner.internal_channel.subscribe(); stream! { loop { - sync_span.in_scope(|| { - debug!("Sync stream is running"); - }); + debug!("Sync stream is running"); select! { biased; @@ -737,9 +734,7 @@ impl SlidingSync { internal_message = internal_channel_receiver.recv() => { use SlidingSyncInternalMessage::*; - sync_span.in_scope(|| { - debug!(?internal_message, "Sync stream has received an internal message"); - }); + debug!(?internal_message, "Sync stream has received an internal message"); match internal_message { Err(_) | Ok(SyncLoopStop) => { @@ -752,7 +747,7 @@ impl SlidingSync { } } - update_summary = self.sync_once().instrument(sync_span.clone()) => { + update_summary = self.sync_once() => { match update_summary { Ok(updates) => { yield Ok(updates); @@ -767,9 +762,7 @@ impl SlidingSync { Err(error) => { if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters. - sync_span.in_scope(|| async { - self.expire_session().await; - }).await; + self.expire_session().await; } yield Err(error);