From 01bb8093d0bf64e30ab8a41552b6146ff2cda65a Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 6 Mar 2025 12:41:55 +0100 Subject: [PATCH] feat(ffi): add a function to setup a lightweight tokio runtime Creating many threads may use a bit of memory: on a machine with N devices, exactly N*2 MB of memory may be consumed. That might be a lot for a NSE process on iOS, which can only have up to 16 MB of RAM allocated for it. For this case, we introduce a new FFI method `setup_lightweight_tokio_runtime` which will spawn at most 4 worker threads and 1 blocking thread. This should be sufficient for most use cases. --- .deny.toml | 2 +- Cargo.lock | 4 +-- Cargo.toml | 2 +- bindings/matrix-sdk-ffi/Cargo.toml | 2 +- bindings/matrix-sdk-ffi/src/client.rs | 19 ++++++----- bindings/matrix-sdk-ffi/src/client_builder.rs | 5 +-- bindings/matrix-sdk-ffi/src/encryption.rs | 12 +++---- bindings/matrix-sdk-ffi/src/lib.rs | 1 - bindings/matrix-sdk-ffi/src/platform.rs | 32 +++++++++++++++++++ bindings/matrix-sdk-ffi/src/room.rs | 14 ++++---- .../src/room_directory_search.rs | 4 +-- bindings/matrix-sdk-ffi/src/room_list.rs | 13 ++++---- .../src/session_verification.rs | 9 +++--- bindings/matrix-sdk-ffi/src/sync_service.rs | 4 +-- bindings/matrix-sdk-ffi/src/timeline/mod.rs | 14 ++++---- bindings/matrix-sdk-ffi/src/utils.rs | 4 +-- bindings/matrix-sdk-ffi/src/widget.rs | 5 +-- 17 files changed, 92 insertions(+), 54 deletions(-) diff --git a/.deny.toml b/.deny.toml index 527040dbc..9aaa10c60 100644 --- a/.deny.toml +++ b/.deny.toml @@ -60,7 +60,7 @@ allow-git = [ # A patch override for the bindings: https://github.com/rodrimati1992/const_panic/pull/10 "https://github.com/jplatte/const_panic", # A patch override for the bindings: https://github.com/smol-rs/async-compat/pull/22 - "https://github.com/jplatte/async-compat", + "https://github.com/element-hq/async-compat", # We can release vodozemac whenever we need but let's not block development # on releases. "https://github.com/matrix-org/vodozemac", diff --git a/Cargo.lock b/Cargo.lock index 3c8119795..0f2bd3cd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -287,8 +287,8 @@ dependencies = [ [[package]] name = "async-compat" -version = "0.2.2" -source = "git+https://github.com/jplatte/async-compat?rev=16dc8597ec09a6102d58d4e7b67714a35dd0ecb8#16dc8597ec09a6102d58d4e7b67714a35dd0ecb8" +version = "0.2.5" +source = "git+https://github.com/element-hq/async-compat?rev=5a27c8b290f1f1dcfc0c4ec22c464e38528aa591#5a27c8b290f1f1dcfc0c4ec22c464e38528aa591" dependencies = [ "futures-core", "futures-io", diff --git a/Cargo.toml b/Cargo.toml index 7c2aabc50..0b65ff6fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ lto = false debug = true [patch.crates-io] -async-compat = { git = "https://github.com/jplatte/async-compat", rev = "16dc8597ec09a6102d58d4e7b67714a35dd0ecb8" } +async-compat = { git = "https://github.com/element-hq/async-compat", rev = "5a27c8b290f1f1dcfc0c4ec22c464e38528aa591" } const_panic = { git = "https://github.com/jplatte/const_panic", rev = "9024a4cb3eac45c1d2d980f17aaee287b17be498" } # Needed to fix rotation log issue on Android (https://github.com/tokio-rs/tracing/issues/2937) tracing = { git = "https://github.com/element-hq/tracing.git", rev = "ca9431f74d37c9d3b5e6a9f35b2c706711dab7dd" } diff --git a/bindings/matrix-sdk-ffi/Cargo.toml b/bindings/matrix-sdk-ffi/Cargo.toml index b771e6f57..bc44d3d80 100644 --- a/bindings/matrix-sdk-ffi/Cargo.toml +++ b/bindings/matrix-sdk-ffi/Cargo.toml @@ -23,7 +23,7 @@ vergen = { version = "8.1.3", features = ["build", "git", "gitcl"] } [dependencies] anyhow = { workspace = true } as_variant = { workspace = true } -async-compat = "0.2.1" +async-compat = "0.2.5" eyeball-im = { workspace = true } extension-trait = "1.0.1" futures-util = { workspace = true } diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 9b908b670..760f19548 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -5,6 +5,7 @@ use std::{ }; use anyhow::{anyhow, Context as _}; +use async_compat::get_runtime_handle; use matrix_sdk::{ authentication::oidc::{ registrations::ClientId, requests::account_management::AccountManagementActionFull, @@ -68,7 +69,7 @@ use tokio::sync::broadcast::error::RecvError; use tracing::{debug, error}; use url::Url; -use super::{room::Room, session_verification::SessionVerificationController, RUNTIME}; +use super::{room::Room, session_verification::SessionVerificationController}; use crate::{ authentication::{HomeserverLoginDetails, OidcConfiguration, OidcError, SsoError, SsoHandler}, client, @@ -500,7 +501,7 @@ impl Client { let q = self.inner.send_queue(); let mut subscriber = q.subscribe_errors(); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { // Respawn tasks for rooms that had unsent events. At this point we've just // created the subscriber, so it'll be notified about errors. q.respawn_tasks_for_rooms_with_unsent_requests().await; @@ -580,7 +581,7 @@ impl Client { delegate.map(|delegate| { let mut session_change_receiver = self.inner.subscribe_to_session_changes(); let client_clone = self.clone(); - let session_change_task = RUNTIME.spawn(async move { + let session_change_task = get_runtime_handle().spawn(async move { loop { match session_change_receiver.recv().await { Ok(session_change) => client_clone.process_session_change(session_change), @@ -666,7 +667,9 @@ impl Client { /// Retrieves an avatar cached from a previous call to [`Self::avatar_url`]. pub fn cached_avatar_url(&self) -> Result, ClientError> { - Ok(RUNTIME.block_on(self.inner.account().get_cached_avatar_url())?.map(Into::into)) + Ok(get_runtime_handle() + .block_on(self.inner.account().get_cached_avatar_url())? + .map(Into::into)) } pub fn device_id(&self) -> Result { @@ -712,7 +715,7 @@ impl Client { if let Some(progress_watcher) = progress_watcher { let mut subscriber = request.subscribe_to_send_progress(); - RUNTIME.spawn(async move { + get_runtime_handle().spawn(async move { while let Some(progress) = subscriber.next().await { progress_watcher.transmission_progress(progress.into()); } @@ -929,7 +932,7 @@ impl Client { } pub fn get_notification_settings(&self) -> Arc { - let inner = RUNTIME.block_on(self.inner.notification_settings()); + let inner = get_runtime_handle().block_on(self.inner.notification_settings()); Arc::new(NotificationSettings::new((*self.inner).clone(), inner)) } @@ -974,7 +977,7 @@ impl Client { listener: Box, ) -> Arc { let mut subscriber = self.inner.subscribe_to_ignore_user_list_changes(); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { while let Some(user_ids) = subscriber.next().await { listener.call(user_ids); } @@ -1283,7 +1286,7 @@ impl Client { fn process_session_change(&self, session_change: SessionChange) { if let Some(delegate) = self.delegate.read().unwrap().clone() { debug!("Applying session change: {session_change:?}"); - RUNTIME.spawn_blocking(move || match session_change { + get_runtime_handle().spawn_blocking(move || match session_change { SessionChange::UnknownToken { soft_logout } => { delegate.did_receive_auth_error(soft_logout); } diff --git a/bindings/matrix-sdk-ffi/src/client_builder.rs b/bindings/matrix-sdk-ffi/src/client_builder.rs index 72b161e19..26a1a6819 100644 --- a/bindings/matrix-sdk-ffi/src/client_builder.rs +++ b/bindings/matrix-sdk-ffi/src/client_builder.rs @@ -1,5 +1,6 @@ use std::{fs, num::NonZeroUsize, path::Path, sync::Arc, time::Duration}; +use async_compat::get_runtime_handle; use futures_util::StreamExt; use matrix_sdk::{ authentication::oidc::qrcode::{self, DeviceCodeErrorResponseType, LoginFailureReason}, @@ -22,7 +23,7 @@ use ruma::api::error::{DeserializationError, FromHttpResponseError}; use tracing::{debug, error}; use zeroize::Zeroizing; -use super::{client::Client, RUNTIME}; +use super::client::Client; use crate::{ authentication::OidcConfiguration, client::ClientSessionDelegate, error::ClientError, helpers::unwrap_or_clone_arc, task_handle::TaskHandle, @@ -698,7 +699,7 @@ impl ClientBuilder { // We create this task, which will get cancelled once it's dropped, just in case // the progress stream doesn't end. - let _progress_task = TaskHandle::new(RUNTIME.spawn(async move { + let _progress_task = TaskHandle::new(get_runtime_handle().spawn(async move { while let Some(state) = progress.next().await { progress_listener.on_update(state.into()); } diff --git a/bindings/matrix-sdk-ffi/src/encryption.rs b/bindings/matrix-sdk-ffi/src/encryption.rs index 1272d9ff5..7b15044a0 100644 --- a/bindings/matrix-sdk-ffi/src/encryption.rs +++ b/bindings/matrix-sdk-ffi/src/encryption.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use async_compat::get_runtime_handle; use futures_util::StreamExt; use matrix_sdk::{ encryption, @@ -9,7 +10,6 @@ use thiserror::Error; use tracing::{error, info}; use zeroize::Zeroize; -use super::RUNTIME; use crate::{client::Client, error::ClientError, ruma::AuthData, task_handle::TaskHandle}; #[derive(uniffi::Object)] @@ -230,7 +230,7 @@ impl Encryption { pub fn backup_state_listener(&self, listener: Box) -> Arc { let mut stream = self.inner.backups().state_stream(); - let stream_task = TaskHandle::new(RUNTIME.spawn(async move { + let stream_task = TaskHandle::new(get_runtime_handle().spawn(async move { while let Some(state) = stream.next().await { let Ok(state) = state else { continue }; listener.on_update(state.into()); @@ -267,7 +267,7 @@ impl Encryption { ) -> Arc { let mut stream = self.inner.recovery().state_stream(); - let stream_task = TaskHandle::new(RUNTIME.spawn(async move { + let stream_task = TaskHandle::new(get_runtime_handle().spawn(async move { while let Some(state) = stream.next().await { listener.on_update(state.into()); } @@ -294,7 +294,7 @@ impl Encryption { let task = if let Some(listener) = progress_listener { let mut progress_stream = wait_for_steady_state.subscribe_to_progress(); - Some(RUNTIME.spawn(async move { + Some(get_runtime_handle().spawn(async move { while let Some(progress) = progress_stream.next().await { let Ok(progress) = progress else { continue }; listener.on_update(progress.into()); @@ -335,7 +335,7 @@ impl Encryption { let mut progress_stream = enable.subscribe_to_progress(); - let task = RUNTIME.spawn(async move { + let task = get_runtime_handle().spawn(async move { while let Some(progress) = progress_stream.next().await { let Ok(progress) = progress else { continue }; progress_listener.on_update(progress.into()); @@ -400,7 +400,7 @@ impl Encryption { ) -> Arc { let mut subscriber = self.inner.verification_state(); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { while let Some(verification_state) = subscriber.next().await { listener.on_update(verification_state.into()); } diff --git a/bindings/matrix-sdk-ffi/src/lib.rs b/bindings/matrix-sdk-ffi/src/lib.rs index 67c82309f..8ea92d61f 100644 --- a/bindings/matrix-sdk-ffi/src/lib.rs +++ b/bindings/matrix-sdk-ffi/src/lib.rs @@ -34,7 +34,6 @@ mod tracing; mod utils; mod widget; -use async_compat::TOKIO1 as RUNTIME; use matrix_sdk::ruma::events::room::message::RoomMessageEventContentWithoutRelation; use self::{ diff --git a/bindings/matrix-sdk-ffi/src/platform.rs b/bindings/matrix-sdk-ffi/src/platform.rs index 59e921969..9fd03c2ce 100644 --- a/bindings/matrix-sdk-ffi/src/platform.rs +++ b/bindings/matrix-sdk-ffi/src/platform.rs @@ -358,6 +358,38 @@ pub fn setup_tracing(config: TracingConfiguration) { .init(); } +/// Set up a lightweight tokio runtime, for processes that have memory +/// limitations (like the NSE process on iOS). +#[matrix_sdk_ffi_macros::export] +pub fn setup_lightweight_tokio_runtime() { + async_compat::set_runtime_builder(Box::new(|| { + eprintln!("spawning a lightweight tokio runtime"); + + // Get the number of available cores through the system, if possible. + let num_available_cores = + std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1); + + // The number of worker threads will be either that or 4, whichever is smaller. + let num_worker_threads = num_available_cores.min(4); + + // Chosen by a fair dice roll. + let num_blocking_threads = 2; + + // 1 MiB of memory per worker thread. Should be enough for everyone™. + let max_memory_bytes = 1024 * 1024; + + let mut builder = tokio::runtime::Builder::new_multi_thread(); + + builder + .enable_all() + .worker_threads(num_worker_threads) + .thread_stack_size(max_memory_bytes) + .max_blocking_threads(num_blocking_threads); + + builder + })); +} + #[cfg(test)] mod tests { use super::build_tracing_filter; diff --git a/bindings/matrix-sdk-ffi/src/room.rs b/bindings/matrix-sdk-ffi/src/room.rs index d286603b2..7af966a25 100644 --- a/bindings/matrix-sdk-ffi/src/room.rs +++ b/bindings/matrix-sdk-ffi/src/room.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, pin::pin, sync::Arc}; use anyhow::{Context, Result}; +use async_compat::get_runtime_handle; use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{ crypto::LocalTrust, @@ -30,7 +31,6 @@ use ruma::{ use tokio::sync::RwLock; use tracing::{error, warn}; -use super::RUNTIME; use crate::{ chunk_iterator::ChunkIterator, client::{JoinRule, RoomVisibility}, @@ -111,7 +111,7 @@ impl Room { } pub fn is_direct(&self) -> bool { - RUNTIME.block_on(self.inner.is_direct()).unwrap_or(false) + get_runtime_handle().block_on(self.inner.is_direct()).unwrap_or(false) } pub fn is_public(&self) -> bool { @@ -292,7 +292,7 @@ impl Room { listener: Box, ) -> Arc { let mut subscriber = self.inner.subscribe_info(); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { while subscriber.next().await.is_some() { match self.room_info().await { Ok(room_info) => listener.call(room_info), @@ -596,7 +596,7 @@ impl Room { self: Arc, listener: Box, ) -> Arc { - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { let (_event_handler_drop_guard, mut subscriber) = self.inner.subscribe_to_typing_notifications(); while let Ok(typing_user_ids) = subscriber.recv().await { @@ -612,7 +612,7 @@ impl Room { listener: Box, ) -> Arc { let room = self.inner.clone(); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { let status_changes = room.subscribe_to_identity_status_changes().await; if let Ok(status_changes) = status_changes { // TODO: what to do with failures? @@ -880,7 +880,7 @@ impl Room { ) -> Result, ClientError> { let (stream, seen_ids_cleanup_handle) = self.inner.subscribe_to_knock_requests().await?; - let handle = Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + let handle = Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { pin_mut!(stream); while let Some(requests) = stream.next().await { listener.call(requests.into_iter().map(Into::into).collect()); @@ -1030,7 +1030,7 @@ impl Room { ) -> Arc { let room = self.inner.clone(); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { let subscription = room.observe_live_location_shares(); let mut stream = subscription.subscribe(); let mut pinned_stream = pin!(stream); diff --git a/bindings/matrix-sdk-ffi/src/room_directory_search.rs b/bindings/matrix-sdk-ffi/src/room_directory_search.rs index 73baceff5..b266cca1a 100644 --- a/bindings/matrix-sdk-ffi/src/room_directory_search.rs +++ b/bindings/matrix-sdk-ffi/src/room_directory_search.rs @@ -15,13 +15,13 @@ use std::{fmt::Debug, sync::Arc}; +use async_compat::get_runtime_handle; use eyeball_im::VectorDiff; use futures_util::StreamExt; use matrix_sdk::room_directory_search::RoomDirectorySearch as SdkRoomDirectorySearch; use ruma::ServerName; use tokio::sync::RwLock; -use super::RUNTIME; use crate::{error::ClientError, task_handle::TaskHandle}; #[derive(uniffi::Enum)] @@ -137,7 +137,7 @@ impl RoomDirectorySearch { ) -> Arc { let (initial_values, mut stream) = self.inner.read().await.results(); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { listener.on_update(vec![RoomDirectorySearchEntryUpdate::Reset { values: initial_values.into_iter().map(Into::into).collect(), }]); diff --git a/bindings/matrix-sdk-ffi/src/room_list.rs b/bindings/matrix-sdk-ffi/src/room_list.rs index 7ababe61d..7fdfabdff 100644 --- a/bindings/matrix-sdk-ffi/src/room_list.rs +++ b/bindings/matrix-sdk-ffi/src/room_list.rs @@ -2,6 +2,7 @@ use std::{fmt::Debug, mem::MaybeUninit, ptr::addr_of_mut, sync::Arc, time::Duration}; +use async_compat::get_runtime_handle; use eyeball_im::VectorDiff; use futures_util::{pin_mut, StreamExt, TryFutureExt}; use matrix_sdk::ruma::{ @@ -28,7 +29,7 @@ use crate::{ room_preview::RoomPreview, timeline::{configuration::TimelineEventTypeFilter, EventTimelineItem, Timeline}, utils::AsyncRuntimeDropped, - TaskHandle, RUNTIME, + TaskHandle, }; #[derive(Debug, thiserror::Error, uniffi::Error)] @@ -91,7 +92,7 @@ impl RoomListService { fn state(&self, listener: Box) -> Arc { let state_stream = self.inner.state(); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { pin_mut!(state_stream); while let Some(state) = state_stream.next().await { @@ -127,7 +128,7 @@ impl RoomListService { Duration::from_millis(delay_before_hiding_in_ms.into()), ); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { pin_mut!(sync_indicator_stream); while let Some(sync_indicator) = sync_indicator_stream.next().await { @@ -166,7 +167,7 @@ impl RoomList { Ok(RoomListLoadingStateResult { state: loading_state.get().into(), - state_stream: Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + state_stream: Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { pin_mut!(loading_state); while let Some(loading_state) = loading_state.next().await { @@ -236,7 +237,7 @@ impl RoomList { let dynamic_entries_controller = Arc::new(RoomListDynamicEntriesController::new(dynamic_entries_controller)); - let entries_stream = Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + let entries_stream = Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { pin_mut!(entries_stream); while let Some(diffs) = entries_stream.next().await { @@ -557,7 +558,7 @@ impl RoomListItem { } fn is_direct(&self) -> bool { - RUNTIME.block_on(self.inner.inner_room().is_direct()).unwrap_or(false) + get_runtime_handle().block_on(self.inner.inner_room().is_direct()).unwrap_or(false) } fn canonical_alias(&self) -> Option { diff --git a/bindings/matrix-sdk-ffi/src/session_verification.rs b/bindings/matrix-sdk-ffi/src/session_verification.rs index 84228e33c..402e69734 100644 --- a/bindings/matrix-sdk-ffi/src/session_verification.rs +++ b/bindings/matrix-sdk-ffi/src/session_verification.rs @@ -1,5 +1,6 @@ use std::sync::{Arc, RwLock}; +use async_compat::get_runtime_handle; use futures_util::StreamExt; use matrix_sdk::{ encryption::{ @@ -13,7 +14,6 @@ use matrix_sdk::{ use ruma::UserId; use tracing::{error, warn}; -use super::RUNTIME; use crate::{client::UserProfile, error::ClientError, utils::Timestamp}; #[derive(uniffi::Object)] @@ -165,7 +165,8 @@ impl SessionVerificationController { } let delegate = self.delegate.clone(); - RUNTIME.spawn(Self::listen_to_sas_verification_changes(verification, delegate)); + get_runtime_handle() + .spawn(Self::listen_to_sas_verification_changes(verification, delegate)); } _ => { if let Some(delegate) = &*self.delegate.read().unwrap() { @@ -290,7 +291,7 @@ impl SessionVerificationController { *self.verification_request.write().unwrap() = Some(verification_request.clone()); - RUNTIME.spawn(Self::listen_to_verification_request_changes( + get_runtime_handle().spawn(Self::listen_to_verification_request_changes( verification_request, self.sas_verification.clone(), self.delegate.clone(), @@ -322,7 +323,7 @@ impl SessionVerificationController { } let delegate = delegate.clone(); - RUNTIME.spawn(Self::listen_to_sas_verification_changes( + get_runtime_handle().spawn(Self::listen_to_sas_verification_changes( verification, delegate, )); diff --git a/bindings/matrix-sdk-ffi/src/sync_service.rs b/bindings/matrix-sdk-ffi/src/sync_service.rs index 8ca09cc61..eb56479ac 100644 --- a/bindings/matrix-sdk-ffi/src/sync_service.rs +++ b/bindings/matrix-sdk-ffi/src/sync_service.rs @@ -14,6 +14,7 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; +use async_compat::get_runtime_handle; use futures_util::pin_mut; use matrix_sdk::{crypto::types::events::UtdCause, Client}; use matrix_sdk_ui::{ @@ -29,7 +30,6 @@ use tracing::error; use crate::{ error::ClientError, helpers::unwrap_or_clone_arc, room_list::RoomListService, TaskHandle, - RUNTIME, }; #[derive(uniffi::Enum)] @@ -84,7 +84,7 @@ impl SyncService { pub fn state(&self, listener: Box) -> Arc { let state_stream = self.inner.state(); - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { pin_mut!(state_stream); while let Some(state) = state_stream.next().await { diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index e070b4e77..fb28b48cd 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -16,6 +16,7 @@ use std::{collections::HashMap, fmt::Write as _, fs, panic, sync::Arc}; use anyhow::{Context, Result}; use as_variant::as_variant; +use async_compat::get_runtime_handle; use content::{InReplyToDetails, RepliedToEventDetails}; use eyeball_im::VectorDiff; use futures_util::{pin_mut, StreamExt as _}; @@ -73,7 +74,6 @@ use crate::{ }, task_handle::TaskHandle, utils::Timestamp, - RUNTIME, }; pub mod configuration; @@ -124,7 +124,7 @@ impl Timeline { .formatted_caption(formatted_caption) .mentions(params.mentions.map(Into::into)); - let handle = SendAttachmentJoinHandle::new(RUNTIME.spawn(async move { + let handle = SendAttachmentJoinHandle::new(get_runtime_handle().spawn(async move { let mut request = self.inner.send_attachment(params.filename, mime_type, attachment_config); @@ -134,7 +134,7 @@ impl Timeline { if let Some(progress_watcher) = progress_watcher { let mut subscriber = request.subscribe_to_send_progress(); - RUNTIME.spawn(async move { + get_runtime_handle().spawn(async move { while let Some(progress) = subscriber.next().await { progress_watcher.transmission_progress(progress.into()); } @@ -215,7 +215,7 @@ impl Timeline { pub async fn add_listener(&self, listener: Box) -> Arc { let (timeline_items, timeline_stream) = self.inner.subscribe().await; - Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { pin_mut!(timeline_stream); // It's important that the initial items are passed *before* we forward the @@ -237,7 +237,7 @@ impl Timeline { } pub fn retry_decryption(self: Arc, session_ids: Vec) { - RUNTIME.spawn(async move { + get_runtime_handle().spawn(async move { self.inner.retry_decryption(&session_ids).await; }); } @@ -256,7 +256,7 @@ impl Timeline { .await .context("can't subscribe to the back-pagination status on a focused timeline")?; - Ok(Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + Ok(Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { // Send the current state even if it hasn't changed right away. listener.on_update(initial); @@ -452,7 +452,7 @@ impl Timeline { let poll_end_event_content = UnstablePollEndEventContent::new(text, poll_start_event_id); let event_content = AnyMessageLikeEventContent::UnstablePollEnd(poll_end_event_content); - RUNTIME.spawn(async move { + get_runtime_handle().spawn(async move { if let Err(err) = self.inner.send(event_content).await { error!("unable to end poll: {err}"); } diff --git a/bindings/matrix-sdk-ffi/src/utils.rs b/bindings/matrix-sdk-ffi/src/utils.rs index 19e6a4a56..2ddec8095 100644 --- a/bindings/matrix-sdk-ffi/src/utils.rs +++ b/bindings/matrix-sdk-ffi/src/utils.rs @@ -14,7 +14,7 @@ use std::{mem::ManuallyDrop, ops::Deref}; -use async_compat::TOKIO1 as RUNTIME; +use async_compat::get_runtime_handle; use ruma::{MilliSecondsSinceUnixEpoch, UInt}; use tracing::warn; @@ -54,7 +54,7 @@ impl AsyncRuntimeDropped { impl Drop for AsyncRuntimeDropped { fn drop(&mut self) { - let _guard = RUNTIME.enter(); + let _guard = get_runtime_handle().enter(); // SAFETY: self.inner is never used again, which is the only requirement // for ManuallyDrop::drop to be used safely. unsafe { diff --git a/bindings/matrix-sdk-ffi/src/widget.rs b/bindings/matrix-sdk-ffi/src/widget.rs index 44aa4dbff..7cbd1a8db 100644 --- a/bindings/matrix-sdk-ffi/src/widget.rs +++ b/bindings/matrix-sdk-ffi/src/widget.rs @@ -1,5 +1,6 @@ use std::sync::{Arc, Mutex}; +use async_compat::get_runtime_handle; use language_tags::LanguageTag; use matrix_sdk::{ async_trait, @@ -8,7 +9,7 @@ use matrix_sdk::{ use ruma::events::MessageLikeEventType; use tracing::error; -use crate::{room::Room, RUNTIME}; +use crate::room::Room; #[derive(uniffi::Record)] pub struct WidgetDriverAndHandle { @@ -501,7 +502,7 @@ impl matrix_sdk::widget::CapabilitiesProvider for CapabilitiesProviderWrap { // This could require a prompt to the user. Ideally the callback // interface would just be async, but that's not supported yet so use // one of tokio's blocking task threads instead. - RUNTIME + get_runtime_handle() .spawn_blocking(move || this.acquire_capabilities(capabilities.into()).into()) .await // propagate panics from the blocking task