mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-04-29 11:35:24 -04:00
feat(client): expose a client pause/resume mechanism throughout the SDK
This patch exposes the pause/resume mechanism for SDK stores all the way up to the FFI `Client`, so apps can temporarily release SQLite resources when moving to the background and re-acquire them on resume. The main use case is iOS backgrounding, where keeping SQLite file descriptors and locks open can contribute to `0xdead10cc` terminations by the operating system.
This commit is contained in:
@@ -454,6 +454,24 @@ impl Client {
|
||||
Ok(self.inner.optimize_stores().await?)
|
||||
}
|
||||
|
||||
/// Pause the client, releasing all database connections and file locks.
|
||||
///
|
||||
/// Call this when the app enters the background on iOS to prevent
|
||||
/// `0xdead10cc` terminations. Waits for all in-flight database
|
||||
/// operations to complete before returning.
|
||||
///
|
||||
/// Call `resume()` when the app returns to the foreground.
|
||||
pub async fn pause(&self) -> Result<(), ClientError> {
|
||||
Ok(self.inner.pause().await?)
|
||||
}
|
||||
|
||||
/// Resume the client after a `pause()`, re-opening database connections.
|
||||
///
|
||||
/// Call this when the app returns to the foreground.
|
||||
pub async fn resume(&self) -> Result<(), ClientError> {
|
||||
Ok(self.inner.resume().await?)
|
||||
}
|
||||
|
||||
/// Returns the sizes of the existing stores, if known.
|
||||
pub async fn get_store_sizes(&self) -> Result<StoreSizes, ClientError> {
|
||||
Ok(self.inner.get_store_sizes().await?.into())
|
||||
|
||||
@@ -1143,6 +1143,32 @@ impl BaseClient {
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Pause all stores, releasing database connections and file locks.
|
||||
///
|
||||
/// In-flight operations will complete before this returns.
|
||||
pub async fn pause_stores(&self) -> Result<()> {
|
||||
self.state_store.pause().await?;
|
||||
self.event_cache_store.pause().await.map_err(Error::EventCacheStore)?;
|
||||
self.media_store.pause().await.map_err(Error::MediaStore)?;
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
self.crypto_store.pause().await.map_err(Error::CryptoStore)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resume all stores after a pause, re-opening database connections.
|
||||
pub async fn resume_stores(&self) -> Result<()> {
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
self.crypto_store.resume().await.map_err(Error::CryptoStore)?;
|
||||
|
||||
self.media_store.resume().await.map_err(Error::MediaStore)?;
|
||||
self.event_cache_store.resume().await.map_err(Error::EventCacheStore)?;
|
||||
self.state_store.resume().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Represent the `required_state` values sent by a sync request.
|
||||
|
||||
@@ -89,14 +89,14 @@ pub type Connection = Object;
|
||||
/// [`Manager`][managed::Manager] for creating and recycling SQLite
|
||||
/// [`Connection`]s.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Manager {
|
||||
pub struct Manager {
|
||||
pub(crate) database_path: PathBuf,
|
||||
}
|
||||
|
||||
impl Manager {
|
||||
/// Creates a new [`Manager`] for a database.
|
||||
#[must_use]
|
||||
pub(crate) fn new(database_path: PathBuf) -> Self {
|
||||
pub fn new(database_path: PathBuf) -> Self {
|
||||
Self { database_path }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3295,6 +3295,50 @@ impl Client {
|
||||
self.inner.thread_subscription_catchup.get().unwrap()
|
||||
}
|
||||
|
||||
/// Pause the client for background suspension.
|
||||
///
|
||||
/// This method:
|
||||
/// 1. Disables all send queues (prevents new message sends).
|
||||
/// 2. Pauses all database stores, waiting for in-flight operations and
|
||||
/// releasing all SQLite connections and file locks.
|
||||
///
|
||||
/// Call [`Client::resume()`] when the app returns to the foreground.
|
||||
///
|
||||
/// # iOS
|
||||
///
|
||||
/// Call this before the app is suspended to avoid `0xdead10cc` kills.
|
||||
/// Typically called from `applicationDidEnterBackground` or an
|
||||
/// equivalent SwiftUI lifecycle event, *after* stopping the
|
||||
/// `matrix_sdk_ui::sync_service::SyncService`.
|
||||
pub async fn pause(&self) -> Result<()> {
|
||||
info!("Client::pause — releasing database resources");
|
||||
|
||||
// Disable send queues so no new sends hit the stores.
|
||||
self.send_queue().set_enabled(false).await;
|
||||
|
||||
// Pause all stores (waits for in-flight ops, closes connections).
|
||||
self.base_client().pause_stores().await?;
|
||||
|
||||
info!("Client::pause — complete, all database connections released");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resume the client after a [`Client::pause()`].
|
||||
///
|
||||
/// Re-opens database connections and re-enables send queues.
|
||||
pub async fn resume(&self) -> Result<()> {
|
||||
info!("Client::resume — re-acquiring database resources");
|
||||
|
||||
// Resume stores (creates new connection pools).
|
||||
self.base_client().resume_stores().await?;
|
||||
|
||||
// Re-enable send queues.
|
||||
self.send_queue().set_enabled(true).await;
|
||||
|
||||
info!("Client::resume — complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform database optimizations if any are available, i.e. vacuuming in
|
||||
/// SQLite.
|
||||
///
|
||||
|
||||
@@ -63,6 +63,8 @@ use ruma::{
|
||||
use serde_json::{Value as JsonValue, json};
|
||||
use stream_assert::{assert_next_matches, assert_pending};
|
||||
use tempfile::tempdir;
|
||||
#[cfg(feature = "sqlite")]
|
||||
use tokio::time::timeout;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
use wiremock::{
|
||||
Mock, Request, ResponseTemplate,
|
||||
@@ -1437,6 +1439,121 @@ async fn test_restore_room() {
|
||||
assert!(!room.pinned_event_ids().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
#[cfg(feature = "sqlite")]
|
||||
async fn test_client_pause_resume_with_sqlite_store() {
|
||||
let tempdir = tempdir().unwrap();
|
||||
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server
|
||||
.client_builder()
|
||||
.on_builder(|builder| builder.sqlite_store(tempdir.path(), None))
|
||||
.build()
|
||||
.await;
|
||||
|
||||
client
|
||||
.state_store()
|
||||
.set_custom_value(b"pause_resume_key", b"before_pause".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
client.state_store().get_custom_value(b"pause_resume_key").await.unwrap().as_deref(),
|
||||
Some(b"before_pause".as_slice())
|
||||
);
|
||||
|
||||
client.pause().await.unwrap();
|
||||
|
||||
let read_err = client.state_store().get_custom_value(b"pause_resume_key").await.unwrap_err();
|
||||
assert!(
|
||||
read_err.to_string().contains("paused"),
|
||||
"read while paused should mention 'paused', got: {read_err}"
|
||||
);
|
||||
|
||||
let write_err = client
|
||||
.state_store()
|
||||
.set_custom_value(b"pause_resume_key", b"while_paused".to_vec())
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
write_err.to_string().contains("paused"),
|
||||
"write while paused should mention 'paused', got: {write_err}"
|
||||
);
|
||||
|
||||
client.resume().await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
client.state_store().get_custom_value(b"pause_resume_key").await.unwrap().as_deref(),
|
||||
Some(b"before_pause".as_slice())
|
||||
);
|
||||
|
||||
client
|
||||
.state_store()
|
||||
.set_custom_value(b"pause_resume_key", b"after_resume".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
client.state_store().get_custom_value(b"pause_resume_key").await.unwrap().as_deref(),
|
||||
Some(b"after_resume".as_slice())
|
||||
);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
#[cfg(feature = "sqlite")]
|
||||
async fn test_client_pause_waits_for_held_state_store_write() {
|
||||
let tempdir = tempdir().unwrap();
|
||||
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server
|
||||
.client_builder()
|
||||
.on_builder(|builder| builder.sqlite_store(tempdir.path(), None))
|
||||
.build()
|
||||
.await;
|
||||
|
||||
client.state_store().set_custom_value(b"held_write_key", b"initial".to_vec()).await.unwrap();
|
||||
|
||||
let write_fut = client.state_store().set_custom_value(b"held_write_key", b"updated".to_vec());
|
||||
tokio::pin!(write_fut);
|
||||
|
||||
assert!(
|
||||
write_fut.as_mut().now_or_never().is_none(),
|
||||
"write future should not complete immediately before being awaited"
|
||||
);
|
||||
|
||||
let client_clone = client.clone();
|
||||
let pause_handle = spawn(async move {
|
||||
client_clone.pause().await.unwrap();
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
assert!(
|
||||
!pause_handle.is_finished(),
|
||||
"pause should wait for the in-flight state store write to finish"
|
||||
);
|
||||
|
||||
write_fut.await.unwrap();
|
||||
|
||||
timeout(Duration::from_secs(3), pause_handle)
|
||||
.await
|
||||
.expect("pause should complete after the held write finishes")
|
||||
.unwrap();
|
||||
|
||||
let paused_err = client.state_store().get_custom_value(b"held_write_key").await.unwrap_err();
|
||||
assert!(
|
||||
paused_err.to_string().contains("paused"),
|
||||
"store should be paused after pause completes, got: {paused_err}"
|
||||
);
|
||||
|
||||
client.resume().await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
client.state_store().get_custom_value(b"held_write_key").await.unwrap().as_deref(),
|
||||
Some(b"updated".as_slice())
|
||||
);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_logout() {
|
||||
let server = MatrixMockServer::new().await;
|
||||
|
||||
Reference in New Issue
Block a user