From 5e10ccc2481e24f46da192c876e755d44dda02e8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 13 Mar 2024 17:09:57 +0000 Subject: [PATCH] Add more logging for crypto store generation counter (#3207) It's a bit unclear whether the crypto-store generation counter is doing the right thing in terms of causing us to reload the OlmMachine. There is a suspicion that things might be keeping hold of references to the old OlmMachine. This PR attempts to add the generation number to the logging for any operations that hold the cross-process lock. It's obviously not bulletproof: for example, it is possible for the OlmMachine to be replaced without holding the lock; but hopefully this will at least help us understand what's going on. --- crates/matrix-sdk-crypto/src/machine.rs | 44 +++++++++------ .../src/encryption_sync_service.rs | 45 ++++++++------- crates/matrix-sdk/src/encryption/mod.rs | 55 ++++++++++++++----- crates/matrix-sdk/src/room/mod.rs | 5 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 15 ++--- 5 files changed, 101 insertions(+), 63 deletions(-) diff --git a/crates/matrix-sdk-crypto/src/machine.rs b/crates/matrix-sdk-crypto/src/machine.rs index 0f558b95d..f597863b0 100644 --- a/crates/matrix-sdk-crypto/src/machine.rs +++ b/crates/matrix-sdk-crypto/src/machine.rs @@ -1927,6 +1927,8 @@ impl OlmMachine { None => 0, }; + tracing::debug!("Initialising crypto store generation at {}", gen); + self.inner .store .set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, gen.to_le_bytes().to_vec()) @@ -1939,19 +1941,32 @@ impl OlmMachine { /// If needs be, update the local and on-disk crypto store generation. /// - /// Returns true whether another user has modified the internal generation - /// counter, and as such we've incremented and updated it in the - /// database. - /// /// ## Requirements /// /// - This assumes that `initialize_crypto_store_generation` has been called /// beforehand. /// - This requires that the crypto store lock has been acquired. - pub async fn maintain_crypto_store_generation( - &self, + /// + /// # Arguments + /// + /// * `generation` - The in-memory generation counter (or rather, the + /// `Mutex` wrapping it). This defines the "expected" generation on entry, + /// and, if we determine an update is needed, is updated to hold the "new" + /// generation. + /// + /// # Returns + /// + /// A tuple containing: + /// + /// * A `bool`, set to `true` if another process has updated the generation + /// number in the `Store` since our expected value, and as such we've + /// incremented and updated it in the database. Otherwise, `false`. + /// + /// * The (possibly updated) generation counter. + pub async fn maintain_crypto_store_generation<'a>( + &'a self, generation: &Mutex>, - ) -> StoreResult { + ) -> StoreResult<(bool, u64)> { let mut gen_guard = generation.lock().await; // The database value must be there: @@ -1973,10 +1988,10 @@ impl OlmMachine { CryptoStoreError::InvalidLockGeneration("invalid format".to_owned()) })?); - let expected_gen = match gen_guard.as_ref() { + let new_gen = match gen_guard.as_ref() { Some(expected_gen) => { if actual_gen == *expected_gen { - return Ok(false); + return Ok((false, actual_gen)); } // Increment the biggest, and store it everywhere. actual_gen.max(*expected_gen).wrapping_add(1) @@ -1992,22 +2007,19 @@ impl OlmMachine { "Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}", *gen_guard, actual_gen, - expected_gen + new_gen ); // Update known value. - *gen_guard = Some(expected_gen); + *gen_guard = Some(new_gen); // Update value in database. self.inner .store - .set_custom_value( - Self::CURRENT_GENERATION_STORE_KEY, - expected_gen.to_le_bytes().to_vec(), - ) + .set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, new_gen.to_le_bytes().to_vec()) .await?; - Ok(true) + Ok((true, new_gen)) } /// Manage dehydrated devices. diff --git a/crates/matrix-sdk-ui/src/encryption_sync_service.rs b/crates/matrix-sdk-ui/src/encryption_sync_service.rs index 56e2051d6..c4ca039f0 100644 --- a/crates/matrix-sdk-ui/src/encryption_sync_service.rs +++ b/crates/matrix-sdk-ui/src/encryption_sync_service.rs @@ -26,7 +26,7 @@ //! //! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension -use std::time::Duration; +use std::{pin::Pin, time::Duration}; use async_stream::stream; use futures_core::stream::Stream; @@ -34,7 +34,7 @@ use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{Client, SlidingSync, LEASE_DURATION_MS}; use ruma::{api::client::sync::sync_events::v4, assign}; use tokio::sync::OwnedMutexGuard; -use tracing::{debug, trace}; +use tracing::{debug, instrument, trace, Span}; /// Unit type representing a permit to *use* an [`EncryptionSyncService`]. /// @@ -143,6 +143,7 @@ impl EncryptionSyncService { /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at /// most one encryption sync running at any time. See its documentation /// for more details. + #[instrument(skip_all, fields(store_generation))] pub async fn run_fixed_iterations( self, num_iterations: u8, @@ -152,7 +153,7 @@ impl EncryptionSyncService { pin_mut!(sync); - let _lock_guard = if self.with_locking { + let lock_guard = if self.with_locking { let mut lock_guard = self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?; @@ -192,6 +193,8 @@ impl EncryptionSyncService { None }; + Span::current().record("store_generation", lock_guard.map(|guard| guard.generation())); + for _ in 0..num_iterations { match sync.next().await { Some(Ok(update_summary)) => { @@ -241,17 +244,7 @@ impl EncryptionSyncService { pin_mut!(sync); loop { - let guard = if self.with_locking { - self.client - .encryption() - .spin_lock_store(Some(60000)) - .await - .map_err(Error::LockError)? - } else { - None - }; - - match sync.next().await { + match self.next_sync_with_lock(&mut sync).await? { Some(Ok(update_summary)) => { // This API is only concerned with the e2ee and to-device extensions. // Warn if anything weird has been received from the proxy. @@ -264,18 +257,12 @@ impl EncryptionSyncService { // Cool cool, let's do it again. trace!("Encryption sync received an update!"); - - drop(guard); - yield Ok(()); continue; } Some(Err(err)) => { trace!("Encryption sync stopped because of an error: {err:#}"); - - drop(guard); - yield Err(Error::SlidingSync(err)); break; } @@ -289,6 +276,24 @@ impl EncryptionSyncService { }) } + /// Helper function for `sync`. Take the cross-process store lock, and call + /// `sync.next()` + #[instrument(skip_all, fields(store_generation))] + async fn next_sync_with_lock( + &self, + sync: &mut Pin<&mut impl Stream>, + ) -> Result, Error> { + let guard = if self.with_locking { + self.client.encryption().spin_lock_store(Some(60000)).await.map_err(Error::LockError)? + } else { + None + }; + + Span::current().record("store_generation", guard.map(|guard| guard.generation())); + + Ok(sync.next().await) + } + /// Requests that the underlying sliding sync be stopped. /// /// This will unlock the cross-process lock, if taken. diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index e14251d39..5ce646359 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -201,6 +201,20 @@ pub enum VerificationState { Unverified, } +/// Wraps together a `CrossProcessLockStoreGuard` and a generation number. +#[derive(Debug)] +pub struct CrossProcessLockStoreGuardWithGeneration { + _guard: CrossProcessStoreLockGuard, + generation: u64, +} + +impl CrossProcessLockStoreGuardWithGeneration { + /// Return the Crypto Store generation associated with this store lock. + pub fn generation(&self) -> u64 { + self.generation + } +} + impl Client { pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option> { self.base_client().olm_machine().await @@ -1247,21 +1261,30 @@ impl Encryption { /// Maybe reload the `OlmMachine` after acquiring the lock for the first /// time. - async fn on_lock_newly_acquired(&self) -> Result<(), Error> { + /// + /// Returns the current generation number. + async fn on_lock_newly_acquired(&self) -> Result { let olm_machine_guard = self.client.olm_machine().await; if let Some(olm_machine) = olm_machine_guard.as_ref() { - // If the crypto store generation has changed, - if olm_machine + let (new_gen, generation_number) = olm_machine .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation) - .await? - { + .await?; + // If the crypto store generation has changed, + if new_gen { // (get rid of the reference to the current crypto store first) drop(olm_machine_guard); // Recreate the OlmMachine. self.client.base_client().regenerate_olm().await?; } + Ok(generation_number) + } else { + // XXX: not sure this is reachable. Seems like the OlmMachine should always have + // been initialised by the time we get here. Ideally we'd panic, or return an + // error, but for now I'm just adding some logging to check if it + // happens, and returning the magic number 0. + warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised"); + Ok(0) } - Ok(()) } /// If a lock was created with [`Self::enable_cross_process_store_lock`], @@ -1272,13 +1295,13 @@ impl Encryption { pub async fn spin_lock_store( &self, max_backoff: Option, - ) -> Result, Error> { + ) -> Result, Error> { if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() { let guard = lock.spin_lock(max_backoff).await?; - self.on_lock_newly_acquired().await?; + let generation = self.on_lock_newly_acquired().await?; - Ok(Some(guard)) + Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation })) } else { Ok(None) } @@ -1288,15 +1311,19 @@ impl Encryption { /// attempts to lock it once. /// /// Returns a guard to the lock, if it was obtained. - pub async fn try_lock_store_once(&self) -> Result, Error> { + pub async fn try_lock_store_once( + &self, + ) -> Result, Error> { if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() { let maybe_guard = lock.try_lock_once().await?; - if maybe_guard.is_some() { - self.on_lock_newly_acquired().await?; - } + let Some(guard) = maybe_guard else { + return Ok(None); + }; - Ok(maybe_guard) + let generation = self.on_lock_newly_acquired().await?; + + Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation })) } else { Ok(None) } diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index bc9fa0577..d3bdafe93 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -1448,12 +1448,13 @@ impl Room { // TODO: expose this publicly so people can pre-share a group session if // e.g. a user starts to type a message for a room. #[cfg(feature = "e2e-encryption")] - #[instrument(skip_all, fields(room_id = ?self.room_id()))] + #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))] async fn preshare_room_key(&self) -> Result<()> { self.ensure_room_joined()?; // Take and release the lock on the store, if needs be. - let _guard = self.client.encryption().spin_lock_store(Some(60000)).await?; + let guard = self.client.encryption().spin_lock_store(Some(60000)).await?; + tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation())); self.client .locks() diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 9ffe486e3..438fff0c7 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -722,14 +722,11 @@ impl SlidingSync { pub fn sync(&self) -> impl Stream> + '_ { debug!("Starting sync stream"); - let sync_span = Span::current(); let mut internal_channel_receiver = self.inner.internal_channel.subscribe(); stream! { loop { - sync_span.in_scope(|| { - debug!("Sync stream is running"); - }); + debug!("Sync stream is running"); select! { biased; @@ -737,9 +734,7 @@ impl SlidingSync { internal_message = internal_channel_receiver.recv() => { use SlidingSyncInternalMessage::*; - sync_span.in_scope(|| { - debug!(?internal_message, "Sync stream has received an internal message"); - }); + debug!(?internal_message, "Sync stream has received an internal message"); match internal_message { Err(_) | Ok(SyncLoopStop) => { @@ -752,7 +747,7 @@ impl SlidingSync { } } - update_summary = self.sync_once().instrument(sync_span.clone()) => { + update_summary = self.sync_once() => { match update_summary { Ok(updates) => { yield Ok(updates); @@ -767,9 +762,7 @@ impl SlidingSync { Err(error) => { if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters. - sync_span.in_scope(|| async { - self.expire_session().await; - }).await; + self.expire_session().await; } yield Err(error);