Add more logging for crypto store generation counter (#3207)

It's a bit unclear whether the crypto-store generation counter is doing the right thing
in terms of causing us to reload the OlmMachine. There is a suspicion that things
 might be keeping hold of references to the old OlmMachine.

This PR attempts to add the generation number to the logging for any operations that 
hold the cross-process lock. It's obviously not bulletproof: for example, it is possible
for the OlmMachine to be replaced without holding the lock; but hopefully this will
at least help us understand what's going on.
This commit is contained in:
Richard van der Hoff
2024-03-13 17:09:57 +00:00
committed by GitHub
parent 6f9147de86
commit 5e10ccc248
5 changed files with 101 additions and 63 deletions

View File

@@ -1927,6 +1927,8 @@ impl OlmMachine {
None => 0,
};
tracing::debug!("Initialising crypto store generation at {}", gen);
self.inner
.store
.set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, gen.to_le_bytes().to_vec())
@@ -1939,19 +1941,32 @@ impl OlmMachine {
/// If needs be, update the local and on-disk crypto store generation.
///
/// Returns true whether another user has modified the internal generation
/// counter, and as such we've incremented and updated it in the
/// database.
///
/// ## Requirements
///
/// - This assumes that `initialize_crypto_store_generation` has been called
/// beforehand.
/// - This requires that the crypto store lock has been acquired.
pub async fn maintain_crypto_store_generation(
&self,
///
/// # Arguments
///
/// * `generation` - The in-memory generation counter (or rather, the
/// `Mutex` wrapping it). This defines the "expected" generation on entry,
/// and, if we determine an update is needed, is updated to hold the "new"
/// generation.
///
/// # Returns
///
/// A tuple containing:
///
/// * A `bool`, set to `true` if another process has updated the generation
/// number in the `Store` since our expected value, and as such we've
/// incremented and updated it in the database. Otherwise, `false`.
///
/// * The (possibly updated) generation counter.
pub async fn maintain_crypto_store_generation<'a>(
&'a self,
generation: &Mutex<Option<u64>>,
) -> StoreResult<bool> {
) -> StoreResult<(bool, u64)> {
let mut gen_guard = generation.lock().await;
// The database value must be there:
@@ -1973,10 +1988,10 @@ impl OlmMachine {
CryptoStoreError::InvalidLockGeneration("invalid format".to_owned())
})?);
let expected_gen = match gen_guard.as_ref() {
let new_gen = match gen_guard.as_ref() {
Some(expected_gen) => {
if actual_gen == *expected_gen {
return Ok(false);
return Ok((false, actual_gen));
}
// Increment the biggest, and store it everywhere.
actual_gen.max(*expected_gen).wrapping_add(1)
@@ -1992,22 +2007,19 @@ impl OlmMachine {
"Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}",
*gen_guard,
actual_gen,
expected_gen
new_gen
);
// Update known value.
*gen_guard = Some(expected_gen);
*gen_guard = Some(new_gen);
// Update value in database.
self.inner
.store
.set_custom_value(
Self::CURRENT_GENERATION_STORE_KEY,
expected_gen.to_le_bytes().to_vec(),
)
.set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, new_gen.to_le_bytes().to_vec())
.await?;
Ok(true)
Ok((true, new_gen))
}
/// Manage dehydrated devices.

View File

@@ -26,7 +26,7 @@
//!
//! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension
use std::time::Duration;
use std::{pin::Pin, time::Duration};
use async_stream::stream;
use futures_core::stream::Stream;
@@ -34,7 +34,7 @@ use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{Client, SlidingSync, LEASE_DURATION_MS};
use ruma::{api::client::sync::sync_events::v4, assign};
use tokio::sync::OwnedMutexGuard;
use tracing::{debug, trace};
use tracing::{debug, instrument, trace, Span};
/// Unit type representing a permit to *use* an [`EncryptionSyncService`].
///
@@ -143,6 +143,7 @@ impl EncryptionSyncService {
/// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
/// most one encryption sync running at any time. See its documentation
/// for more details.
#[instrument(skip_all, fields(store_generation))]
pub async fn run_fixed_iterations(
self,
num_iterations: u8,
@@ -152,7 +153,7 @@ impl EncryptionSyncService {
pin_mut!(sync);
let _lock_guard = if self.with_locking {
let lock_guard = if self.with_locking {
let mut lock_guard =
self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;
@@ -192,6 +193,8 @@ impl EncryptionSyncService {
None
};
Span::current().record("store_generation", lock_guard.map(|guard| guard.generation()));
for _ in 0..num_iterations {
match sync.next().await {
Some(Ok(update_summary)) => {
@@ -241,17 +244,7 @@ impl EncryptionSyncService {
pin_mut!(sync);
loop {
let guard = if self.with_locking {
self.client
.encryption()
.spin_lock_store(Some(60000))
.await
.map_err(Error::LockError)?
} else {
None
};
match sync.next().await {
match self.next_sync_with_lock(&mut sync).await? {
Some(Ok(update_summary)) => {
// This API is only concerned with the e2ee and to-device extensions.
// Warn if anything weird has been received from the proxy.
@@ -264,18 +257,12 @@ impl EncryptionSyncService {
// Cool cool, let's do it again.
trace!("Encryption sync received an update!");
drop(guard);
yield Ok(());
continue;
}
Some(Err(err)) => {
trace!("Encryption sync stopped because of an error: {err:#}");
drop(guard);
yield Err(Error::SlidingSync(err));
break;
}
@@ -289,6 +276,24 @@ impl EncryptionSyncService {
})
}
/// Helper function for `sync`. Take the cross-process store lock, and call
/// `sync.next()`
#[instrument(skip_all, fields(store_generation))]
async fn next_sync_with_lock<Item>(
&self,
sync: &mut Pin<&mut impl Stream<Item = Item>>,
) -> Result<Option<Item>, Error> {
let guard = if self.with_locking {
self.client.encryption().spin_lock_store(Some(60000)).await.map_err(Error::LockError)?
} else {
None
};
Span::current().record("store_generation", guard.map(|guard| guard.generation()));
Ok(sync.next().await)
}
/// Requests that the underlying sliding sync be stopped.
///
/// This will unlock the cross-process lock, if taken.

View File

@@ -201,6 +201,20 @@ pub enum VerificationState {
Unverified,
}
/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
#[derive(Debug)]
pub struct CrossProcessLockStoreGuardWithGeneration {
_guard: CrossProcessStoreLockGuard,
generation: u64,
}
impl CrossProcessLockStoreGuardWithGeneration {
/// Return the Crypto Store generation associated with this store lock.
pub fn generation(&self) -> u64 {
self.generation
}
}
impl Client {
pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
self.base_client().olm_machine().await
@@ -1247,21 +1261,30 @@ impl Encryption {
/// Maybe reload the `OlmMachine` after acquiring the lock for the first
/// time.
async fn on_lock_newly_acquired(&self) -> Result<(), Error> {
///
/// Returns the current generation number.
async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
let olm_machine_guard = self.client.olm_machine().await;
if let Some(olm_machine) = olm_machine_guard.as_ref() {
// If the crypto store generation has changed,
if olm_machine
let (new_gen, generation_number) = olm_machine
.maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
.await?
{
.await?;
// If the crypto store generation has changed,
if new_gen {
// (get rid of the reference to the current crypto store first)
drop(olm_machine_guard);
// Recreate the OlmMachine.
self.client.base_client().regenerate_olm().await?;
}
Ok(generation_number)
} else {
// XXX: not sure this is reachable. Seems like the OlmMachine should always have
// been initialised by the time we get here. Ideally we'd panic, or return an
// error, but for now I'm just adding some logging to check if it
// happens, and returning the magic number 0.
warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
Ok(0)
}
Ok(())
}
/// If a lock was created with [`Self::enable_cross_process_store_lock`],
@@ -1272,13 +1295,13 @@ impl Encryption {
pub async fn spin_lock_store(
&self,
max_backoff: Option<u32>,
) -> Result<Option<CrossProcessStoreLockGuard>, Error> {
) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
let guard = lock.spin_lock(max_backoff).await?;
self.on_lock_newly_acquired().await?;
let generation = self.on_lock_newly_acquired().await?;
Ok(Some(guard))
Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
} else {
Ok(None)
}
@@ -1288,15 +1311,19 @@ impl Encryption {
/// attempts to lock it once.
///
/// Returns a guard to the lock, if it was obtained.
pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessStoreLockGuard>, Error> {
pub async fn try_lock_store_once(
&self,
) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
let maybe_guard = lock.try_lock_once().await?;
if maybe_guard.is_some() {
self.on_lock_newly_acquired().await?;
}
let Some(guard) = maybe_guard else {
return Ok(None);
};
Ok(maybe_guard)
let generation = self.on_lock_newly_acquired().await?;
Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
} else {
Ok(None)
}

View File

@@ -1448,12 +1448,13 @@ impl Room {
// TODO: expose this publicly so people can pre-share a group session if
// e.g. a user starts to type a message for a room.
#[cfg(feature = "e2e-encryption")]
#[instrument(skip_all, fields(room_id = ?self.room_id()))]
#[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
async fn preshare_room_key(&self) -> Result<()> {
self.ensure_room_joined()?;
// Take and release the lock on the store, if needs be.
let _guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
self.client
.locks()

View File

@@ -722,14 +722,11 @@ impl SlidingSync {
pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
debug!("Starting sync stream");
let sync_span = Span::current();
let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
stream! {
loop {
sync_span.in_scope(|| {
debug!("Sync stream is running");
});
debug!("Sync stream is running");
select! {
biased;
@@ -737,9 +734,7 @@ impl SlidingSync {
internal_message = internal_channel_receiver.recv() => {
use SlidingSyncInternalMessage::*;
sync_span.in_scope(|| {
debug!(?internal_message, "Sync stream has received an internal message");
});
debug!(?internal_message, "Sync stream has received an internal message");
match internal_message {
Err(_) | Ok(SyncLoopStop) => {
@@ -752,7 +747,7 @@ impl SlidingSync {
}
}
update_summary = self.sync_once().instrument(sync_span.clone()) => {
update_summary = self.sync_once() => {
match update_summary {
Ok(updates) => {
yield Ok(updates);
@@ -767,9 +762,7 @@ impl SlidingSync {
Err(error) => {
if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
// The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
sync_span.in_scope(|| async {
self.expire_session().await;
}).await;
self.expire_session().await;
}
yield Err(error);