mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-04 14:04:40 -04:00
feat(ffi): add a function to setup a lightweight tokio runtime
Creating many threads may use a bit of memory: on a machine with N devices, exactly N*2 MB of memory may be consumed. That might be a lot for a NSE process on iOS, which can only have up to 16 MB of RAM allocated for it. For this case, we introduce a new FFI method `setup_lightweight_tokio_runtime` which will spawn at most 4 worker threads and 1 blocking thread. This should be sufficient for most use cases.
This commit is contained in:
@@ -60,7 +60,7 @@ allow-git = [
|
||||
# A patch override for the bindings: https://github.com/rodrimati1992/const_panic/pull/10
|
||||
"https://github.com/jplatte/const_panic",
|
||||
# A patch override for the bindings: https://github.com/smol-rs/async-compat/pull/22
|
||||
"https://github.com/jplatte/async-compat",
|
||||
"https://github.com/element-hq/async-compat",
|
||||
# We can release vodozemac whenever we need but let's not block development
|
||||
# on releases.
|
||||
"https://github.com/matrix-org/vodozemac",
|
||||
|
||||
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -287,8 +287,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-compat"
|
||||
version = "0.2.2"
|
||||
source = "git+https://github.com/jplatte/async-compat?rev=16dc8597ec09a6102d58d4e7b67714a35dd0ecb8#16dc8597ec09a6102d58d4e7b67714a35dd0ecb8"
|
||||
version = "0.2.5"
|
||||
source = "git+https://github.com/element-hq/async-compat?rev=5a27c8b290f1f1dcfc0c4ec22c464e38528aa591#5a27c8b290f1f1dcfc0c4ec22c464e38528aa591"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
|
||||
@@ -147,7 +147,7 @@ lto = false
|
||||
debug = true
|
||||
|
||||
[patch.crates-io]
|
||||
async-compat = { git = "https://github.com/jplatte/async-compat", rev = "16dc8597ec09a6102d58d4e7b67714a35dd0ecb8" }
|
||||
async-compat = { git = "https://github.com/element-hq/async-compat", rev = "5a27c8b290f1f1dcfc0c4ec22c464e38528aa591" }
|
||||
const_panic = { git = "https://github.com/jplatte/const_panic", rev = "9024a4cb3eac45c1d2d980f17aaee287b17be498" }
|
||||
# Needed to fix rotation log issue on Android (https://github.com/tokio-rs/tracing/issues/2937)
|
||||
tracing = { git = "https://github.com/element-hq/tracing.git", rev = "ca9431f74d37c9d3b5e6a9f35b2c706711dab7dd" }
|
||||
|
||||
@@ -23,7 +23,7 @@ vergen = { version = "8.1.3", features = ["build", "git", "gitcl"] }
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
as_variant = { workspace = true }
|
||||
async-compat = "0.2.1"
|
||||
async-compat = "0.2.5"
|
||||
eyeball-im = { workspace = true }
|
||||
extension-trait = "1.0.1"
|
||||
futures-util = { workspace = true }
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, Context as _};
|
||||
use async_compat::get_runtime_handle;
|
||||
use matrix_sdk::{
|
||||
authentication::oidc::{
|
||||
registrations::ClientId, requests::account_management::AccountManagementActionFull,
|
||||
@@ -68,7 +69,7 @@ use tokio::sync::broadcast::error::RecvError;
|
||||
use tracing::{debug, error};
|
||||
use url::Url;
|
||||
|
||||
use super::{room::Room, session_verification::SessionVerificationController, RUNTIME};
|
||||
use super::{room::Room, session_verification::SessionVerificationController};
|
||||
use crate::{
|
||||
authentication::{HomeserverLoginDetails, OidcConfiguration, OidcError, SsoError, SsoHandler},
|
||||
client,
|
||||
@@ -500,7 +501,7 @@ impl Client {
|
||||
let q = self.inner.send_queue();
|
||||
let mut subscriber = q.subscribe_errors();
|
||||
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
// Respawn tasks for rooms that had unsent events. At this point we've just
|
||||
// created the subscriber, so it'll be notified about errors.
|
||||
q.respawn_tasks_for_rooms_with_unsent_requests().await;
|
||||
@@ -580,7 +581,7 @@ impl Client {
|
||||
delegate.map(|delegate| {
|
||||
let mut session_change_receiver = self.inner.subscribe_to_session_changes();
|
||||
let client_clone = self.clone();
|
||||
let session_change_task = RUNTIME.spawn(async move {
|
||||
let session_change_task = get_runtime_handle().spawn(async move {
|
||||
loop {
|
||||
match session_change_receiver.recv().await {
|
||||
Ok(session_change) => client_clone.process_session_change(session_change),
|
||||
@@ -666,7 +667,9 @@ impl Client {
|
||||
|
||||
/// Retrieves an avatar cached from a previous call to [`Self::avatar_url`].
|
||||
pub fn cached_avatar_url(&self) -> Result<Option<String>, ClientError> {
|
||||
Ok(RUNTIME.block_on(self.inner.account().get_cached_avatar_url())?.map(Into::into))
|
||||
Ok(get_runtime_handle()
|
||||
.block_on(self.inner.account().get_cached_avatar_url())?
|
||||
.map(Into::into))
|
||||
}
|
||||
|
||||
pub fn device_id(&self) -> Result<String, ClientError> {
|
||||
@@ -712,7 +715,7 @@ impl Client {
|
||||
|
||||
if let Some(progress_watcher) = progress_watcher {
|
||||
let mut subscriber = request.subscribe_to_send_progress();
|
||||
RUNTIME.spawn(async move {
|
||||
get_runtime_handle().spawn(async move {
|
||||
while let Some(progress) = subscriber.next().await {
|
||||
progress_watcher.transmission_progress(progress.into());
|
||||
}
|
||||
@@ -929,7 +932,7 @@ impl Client {
|
||||
}
|
||||
|
||||
pub fn get_notification_settings(&self) -> Arc<NotificationSettings> {
|
||||
let inner = RUNTIME.block_on(self.inner.notification_settings());
|
||||
let inner = get_runtime_handle().block_on(self.inner.notification_settings());
|
||||
|
||||
Arc::new(NotificationSettings::new((*self.inner).clone(), inner))
|
||||
}
|
||||
@@ -974,7 +977,7 @@ impl Client {
|
||||
listener: Box<dyn IgnoredUsersListener>,
|
||||
) -> Arc<TaskHandle> {
|
||||
let mut subscriber = self.inner.subscribe_to_ignore_user_list_changes();
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
while let Some(user_ids) = subscriber.next().await {
|
||||
listener.call(user_ids);
|
||||
}
|
||||
@@ -1283,7 +1286,7 @@ impl Client {
|
||||
fn process_session_change(&self, session_change: SessionChange) {
|
||||
if let Some(delegate) = self.delegate.read().unwrap().clone() {
|
||||
debug!("Applying session change: {session_change:?}");
|
||||
RUNTIME.spawn_blocking(move || match session_change {
|
||||
get_runtime_handle().spawn_blocking(move || match session_change {
|
||||
SessionChange::UnknownToken { soft_logout } => {
|
||||
delegate.did_receive_auth_error(soft_logout);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::{fs, num::NonZeroUsize, path::Path, sync::Arc, time::Duration};
|
||||
|
||||
use async_compat::get_runtime_handle;
|
||||
use futures_util::StreamExt;
|
||||
use matrix_sdk::{
|
||||
authentication::oidc::qrcode::{self, DeviceCodeErrorResponseType, LoginFailureReason},
|
||||
@@ -22,7 +23,7 @@ use ruma::api::error::{DeserializationError, FromHttpResponseError};
|
||||
use tracing::{debug, error};
|
||||
use zeroize::Zeroizing;
|
||||
|
||||
use super::{client::Client, RUNTIME};
|
||||
use super::client::Client;
|
||||
use crate::{
|
||||
authentication::OidcConfiguration, client::ClientSessionDelegate, error::ClientError,
|
||||
helpers::unwrap_or_clone_arc, task_handle::TaskHandle,
|
||||
@@ -698,7 +699,7 @@ impl ClientBuilder {
|
||||
|
||||
// We create this task, which will get cancelled once it's dropped, just in case
|
||||
// the progress stream doesn't end.
|
||||
let _progress_task = TaskHandle::new(RUNTIME.spawn(async move {
|
||||
let _progress_task = TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
while let Some(state) = progress.next().await {
|
||||
progress_listener.on_update(state.into());
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_compat::get_runtime_handle;
|
||||
use futures_util::StreamExt;
|
||||
use matrix_sdk::{
|
||||
encryption,
|
||||
@@ -9,7 +10,6 @@ use thiserror::Error;
|
||||
use tracing::{error, info};
|
||||
use zeroize::Zeroize;
|
||||
|
||||
use super::RUNTIME;
|
||||
use crate::{client::Client, error::ClientError, ruma::AuthData, task_handle::TaskHandle};
|
||||
|
||||
#[derive(uniffi::Object)]
|
||||
@@ -230,7 +230,7 @@ impl Encryption {
|
||||
pub fn backup_state_listener(&self, listener: Box<dyn BackupStateListener>) -> Arc<TaskHandle> {
|
||||
let mut stream = self.inner.backups().state_stream();
|
||||
|
||||
let stream_task = TaskHandle::new(RUNTIME.spawn(async move {
|
||||
let stream_task = TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
while let Some(state) = stream.next().await {
|
||||
let Ok(state) = state else { continue };
|
||||
listener.on_update(state.into());
|
||||
@@ -267,7 +267,7 @@ impl Encryption {
|
||||
) -> Arc<TaskHandle> {
|
||||
let mut stream = self.inner.recovery().state_stream();
|
||||
|
||||
let stream_task = TaskHandle::new(RUNTIME.spawn(async move {
|
||||
let stream_task = TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
while let Some(state) = stream.next().await {
|
||||
listener.on_update(state.into());
|
||||
}
|
||||
@@ -294,7 +294,7 @@ impl Encryption {
|
||||
let task = if let Some(listener) = progress_listener {
|
||||
let mut progress_stream = wait_for_steady_state.subscribe_to_progress();
|
||||
|
||||
Some(RUNTIME.spawn(async move {
|
||||
Some(get_runtime_handle().spawn(async move {
|
||||
while let Some(progress) = progress_stream.next().await {
|
||||
let Ok(progress) = progress else { continue };
|
||||
listener.on_update(progress.into());
|
||||
@@ -335,7 +335,7 @@ impl Encryption {
|
||||
|
||||
let mut progress_stream = enable.subscribe_to_progress();
|
||||
|
||||
let task = RUNTIME.spawn(async move {
|
||||
let task = get_runtime_handle().spawn(async move {
|
||||
while let Some(progress) = progress_stream.next().await {
|
||||
let Ok(progress) = progress else { continue };
|
||||
progress_listener.on_update(progress.into());
|
||||
@@ -400,7 +400,7 @@ impl Encryption {
|
||||
) -> Arc<TaskHandle> {
|
||||
let mut subscriber = self.inner.verification_state();
|
||||
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
while let Some(verification_state) = subscriber.next().await {
|
||||
listener.on_update(verification_state.into());
|
||||
}
|
||||
|
||||
@@ -34,7 +34,6 @@ mod tracing;
|
||||
mod utils;
|
||||
mod widget;
|
||||
|
||||
use async_compat::TOKIO1 as RUNTIME;
|
||||
use matrix_sdk::ruma::events::room::message::RoomMessageEventContentWithoutRelation;
|
||||
|
||||
use self::{
|
||||
|
||||
@@ -358,6 +358,38 @@ pub fn setup_tracing(config: TracingConfiguration) {
|
||||
.init();
|
||||
}
|
||||
|
||||
/// Set up a lightweight tokio runtime, for processes that have memory
|
||||
/// limitations (like the NSE process on iOS).
|
||||
#[matrix_sdk_ffi_macros::export]
|
||||
pub fn setup_lightweight_tokio_runtime() {
|
||||
async_compat::set_runtime_builder(Box::new(|| {
|
||||
eprintln!("spawning a lightweight tokio runtime");
|
||||
|
||||
// Get the number of available cores through the system, if possible.
|
||||
let num_available_cores =
|
||||
std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
|
||||
|
||||
// The number of worker threads will be either that or 4, whichever is smaller.
|
||||
let num_worker_threads = num_available_cores.min(4);
|
||||
|
||||
// Chosen by a fair dice roll.
|
||||
let num_blocking_threads = 2;
|
||||
|
||||
// 1 MiB of memory per worker thread. Should be enough for everyone™.
|
||||
let max_memory_bytes = 1024 * 1024;
|
||||
|
||||
let mut builder = tokio::runtime::Builder::new_multi_thread();
|
||||
|
||||
builder
|
||||
.enable_all()
|
||||
.worker_threads(num_worker_threads)
|
||||
.thread_stack_size(max_memory_bytes)
|
||||
.max_blocking_threads(num_blocking_threads);
|
||||
|
||||
builder
|
||||
}));
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::build_tracing_filter;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{collections::HashMap, pin::pin, sync::Arc};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_compat::get_runtime_handle;
|
||||
use futures_util::{pin_mut, StreamExt};
|
||||
use matrix_sdk::{
|
||||
crypto::LocalTrust,
|
||||
@@ -30,7 +31,6 @@ use ruma::{
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{error, warn};
|
||||
|
||||
use super::RUNTIME;
|
||||
use crate::{
|
||||
chunk_iterator::ChunkIterator,
|
||||
client::{JoinRule, RoomVisibility},
|
||||
@@ -111,7 +111,7 @@ impl Room {
|
||||
}
|
||||
|
||||
pub fn is_direct(&self) -> bool {
|
||||
RUNTIME.block_on(self.inner.is_direct()).unwrap_or(false)
|
||||
get_runtime_handle().block_on(self.inner.is_direct()).unwrap_or(false)
|
||||
}
|
||||
|
||||
pub fn is_public(&self) -> bool {
|
||||
@@ -292,7 +292,7 @@ impl Room {
|
||||
listener: Box<dyn RoomInfoListener>,
|
||||
) -> Arc<TaskHandle> {
|
||||
let mut subscriber = self.inner.subscribe_info();
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
while subscriber.next().await.is_some() {
|
||||
match self.room_info().await {
|
||||
Ok(room_info) => listener.call(room_info),
|
||||
@@ -596,7 +596,7 @@ impl Room {
|
||||
self: Arc<Self>,
|
||||
listener: Box<dyn TypingNotificationsListener>,
|
||||
) -> Arc<TaskHandle> {
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
let (_event_handler_drop_guard, mut subscriber) =
|
||||
self.inner.subscribe_to_typing_notifications();
|
||||
while let Ok(typing_user_ids) = subscriber.recv().await {
|
||||
@@ -612,7 +612,7 @@ impl Room {
|
||||
listener: Box<dyn IdentityStatusChangeListener>,
|
||||
) -> Arc<TaskHandle> {
|
||||
let room = self.inner.clone();
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
let status_changes = room.subscribe_to_identity_status_changes().await;
|
||||
if let Ok(status_changes) = status_changes {
|
||||
// TODO: what to do with failures?
|
||||
@@ -880,7 +880,7 @@ impl Room {
|
||||
) -> Result<Arc<TaskHandle>, ClientError> {
|
||||
let (stream, seen_ids_cleanup_handle) = self.inner.subscribe_to_knock_requests().await?;
|
||||
|
||||
let handle = Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
let handle = Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
pin_mut!(stream);
|
||||
while let Some(requests) = stream.next().await {
|
||||
listener.call(requests.into_iter().map(Into::into).collect());
|
||||
@@ -1030,7 +1030,7 @@ impl Room {
|
||||
) -> Arc<TaskHandle> {
|
||||
let room = self.inner.clone();
|
||||
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
let subscription = room.observe_live_location_shares();
|
||||
let mut stream = subscription.subscribe();
|
||||
let mut pinned_stream = pin!(stream);
|
||||
|
||||
@@ -15,13 +15,13 @@
|
||||
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use async_compat::get_runtime_handle;
|
||||
use eyeball_im::VectorDiff;
|
||||
use futures_util::StreamExt;
|
||||
use matrix_sdk::room_directory_search::RoomDirectorySearch as SdkRoomDirectorySearch;
|
||||
use ruma::ServerName;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::RUNTIME;
|
||||
use crate::{error::ClientError, task_handle::TaskHandle};
|
||||
|
||||
#[derive(uniffi::Enum)]
|
||||
@@ -137,7 +137,7 @@ impl RoomDirectorySearch {
|
||||
) -> Arc<TaskHandle> {
|
||||
let (initial_values, mut stream) = self.inner.read().await.results();
|
||||
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
listener.on_update(vec![RoomDirectorySearchEntryUpdate::Reset {
|
||||
values: initial_values.into_iter().map(Into::into).collect(),
|
||||
}]);
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
use std::{fmt::Debug, mem::MaybeUninit, ptr::addr_of_mut, sync::Arc, time::Duration};
|
||||
|
||||
use async_compat::get_runtime_handle;
|
||||
use eyeball_im::VectorDiff;
|
||||
use futures_util::{pin_mut, StreamExt, TryFutureExt};
|
||||
use matrix_sdk::ruma::{
|
||||
@@ -28,7 +29,7 @@ use crate::{
|
||||
room_preview::RoomPreview,
|
||||
timeline::{configuration::TimelineEventTypeFilter, EventTimelineItem, Timeline},
|
||||
utils::AsyncRuntimeDropped,
|
||||
TaskHandle, RUNTIME,
|
||||
TaskHandle,
|
||||
};
|
||||
|
||||
#[derive(Debug, thiserror::Error, uniffi::Error)]
|
||||
@@ -91,7 +92,7 @@ impl RoomListService {
|
||||
fn state(&self, listener: Box<dyn RoomListServiceStateListener>) -> Arc<TaskHandle> {
|
||||
let state_stream = self.inner.state();
|
||||
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
pin_mut!(state_stream);
|
||||
|
||||
while let Some(state) = state_stream.next().await {
|
||||
@@ -127,7 +128,7 @@ impl RoomListService {
|
||||
Duration::from_millis(delay_before_hiding_in_ms.into()),
|
||||
);
|
||||
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
pin_mut!(sync_indicator_stream);
|
||||
|
||||
while let Some(sync_indicator) = sync_indicator_stream.next().await {
|
||||
@@ -166,7 +167,7 @@ impl RoomList {
|
||||
|
||||
Ok(RoomListLoadingStateResult {
|
||||
state: loading_state.get().into(),
|
||||
state_stream: Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
state_stream: Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
pin_mut!(loading_state);
|
||||
|
||||
while let Some(loading_state) = loading_state.next().await {
|
||||
@@ -236,7 +237,7 @@ impl RoomList {
|
||||
let dynamic_entries_controller =
|
||||
Arc::new(RoomListDynamicEntriesController::new(dynamic_entries_controller));
|
||||
|
||||
let entries_stream = Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
let entries_stream = Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
pin_mut!(entries_stream);
|
||||
|
||||
while let Some(diffs) = entries_stream.next().await {
|
||||
@@ -557,7 +558,7 @@ impl RoomListItem {
|
||||
}
|
||||
|
||||
fn is_direct(&self) -> bool {
|
||||
RUNTIME.block_on(self.inner.inner_room().is_direct()).unwrap_or(false)
|
||||
get_runtime_handle().block_on(self.inner.inner_room().is_direct()).unwrap_or(false)
|
||||
}
|
||||
|
||||
fn canonical_alias(&self) -> Option<String> {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use async_compat::get_runtime_handle;
|
||||
use futures_util::StreamExt;
|
||||
use matrix_sdk::{
|
||||
encryption::{
|
||||
@@ -13,7 +14,6 @@ use matrix_sdk::{
|
||||
use ruma::UserId;
|
||||
use tracing::{error, warn};
|
||||
|
||||
use super::RUNTIME;
|
||||
use crate::{client::UserProfile, error::ClientError, utils::Timestamp};
|
||||
|
||||
#[derive(uniffi::Object)]
|
||||
@@ -165,7 +165,8 @@ impl SessionVerificationController {
|
||||
}
|
||||
|
||||
let delegate = self.delegate.clone();
|
||||
RUNTIME.spawn(Self::listen_to_sas_verification_changes(verification, delegate));
|
||||
get_runtime_handle()
|
||||
.spawn(Self::listen_to_sas_verification_changes(verification, delegate));
|
||||
}
|
||||
_ => {
|
||||
if let Some(delegate) = &*self.delegate.read().unwrap() {
|
||||
@@ -290,7 +291,7 @@ impl SessionVerificationController {
|
||||
|
||||
*self.verification_request.write().unwrap() = Some(verification_request.clone());
|
||||
|
||||
RUNTIME.spawn(Self::listen_to_verification_request_changes(
|
||||
get_runtime_handle().spawn(Self::listen_to_verification_request_changes(
|
||||
verification_request,
|
||||
self.sas_verification.clone(),
|
||||
self.delegate.clone(),
|
||||
@@ -322,7 +323,7 @@ impl SessionVerificationController {
|
||||
}
|
||||
|
||||
let delegate = delegate.clone();
|
||||
RUNTIME.spawn(Self::listen_to_sas_verification_changes(
|
||||
get_runtime_handle().spawn(Self::listen_to_sas_verification_changes(
|
||||
verification,
|
||||
delegate,
|
||||
));
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::{fmt::Debug, sync::Arc, time::Duration};
|
||||
|
||||
use async_compat::get_runtime_handle;
|
||||
use futures_util::pin_mut;
|
||||
use matrix_sdk::{crypto::types::events::UtdCause, Client};
|
||||
use matrix_sdk_ui::{
|
||||
@@ -29,7 +30,6 @@ use tracing::error;
|
||||
|
||||
use crate::{
|
||||
error::ClientError, helpers::unwrap_or_clone_arc, room_list::RoomListService, TaskHandle,
|
||||
RUNTIME,
|
||||
};
|
||||
|
||||
#[derive(uniffi::Enum)]
|
||||
@@ -84,7 +84,7 @@ impl SyncService {
|
||||
pub fn state(&self, listener: Box<dyn SyncServiceStateObserver>) -> Arc<TaskHandle> {
|
||||
let state_stream = self.inner.state();
|
||||
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
pin_mut!(state_stream);
|
||||
|
||||
while let Some(state) = state_stream.next().await {
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::{collections::HashMap, fmt::Write as _, fs, panic, sync::Arc};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use as_variant::as_variant;
|
||||
use async_compat::get_runtime_handle;
|
||||
use content::{InReplyToDetails, RepliedToEventDetails};
|
||||
use eyeball_im::VectorDiff;
|
||||
use futures_util::{pin_mut, StreamExt as _};
|
||||
@@ -73,7 +74,6 @@ use crate::{
|
||||
},
|
||||
task_handle::TaskHandle,
|
||||
utils::Timestamp,
|
||||
RUNTIME,
|
||||
};
|
||||
|
||||
pub mod configuration;
|
||||
@@ -124,7 +124,7 @@ impl Timeline {
|
||||
.formatted_caption(formatted_caption)
|
||||
.mentions(params.mentions.map(Into::into));
|
||||
|
||||
let handle = SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
|
||||
let handle = SendAttachmentJoinHandle::new(get_runtime_handle().spawn(async move {
|
||||
let mut request =
|
||||
self.inner.send_attachment(params.filename, mime_type, attachment_config);
|
||||
|
||||
@@ -134,7 +134,7 @@ impl Timeline {
|
||||
|
||||
if let Some(progress_watcher) = progress_watcher {
|
||||
let mut subscriber = request.subscribe_to_send_progress();
|
||||
RUNTIME.spawn(async move {
|
||||
get_runtime_handle().spawn(async move {
|
||||
while let Some(progress) = subscriber.next().await {
|
||||
progress_watcher.transmission_progress(progress.into());
|
||||
}
|
||||
@@ -215,7 +215,7 @@ impl Timeline {
|
||||
pub async fn add_listener(&self, listener: Box<dyn TimelineListener>) -> Arc<TaskHandle> {
|
||||
let (timeline_items, timeline_stream) = self.inner.subscribe().await;
|
||||
|
||||
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
pin_mut!(timeline_stream);
|
||||
|
||||
// It's important that the initial items are passed *before* we forward the
|
||||
@@ -237,7 +237,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
pub fn retry_decryption(self: Arc<Self>, session_ids: Vec<String>) {
|
||||
RUNTIME.spawn(async move {
|
||||
get_runtime_handle().spawn(async move {
|
||||
self.inner.retry_decryption(&session_ids).await;
|
||||
});
|
||||
}
|
||||
@@ -256,7 +256,7 @@ impl Timeline {
|
||||
.await
|
||||
.context("can't subscribe to the back-pagination status on a focused timeline")?;
|
||||
|
||||
Ok(Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
|
||||
Ok(Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
// Send the current state even if it hasn't changed right away.
|
||||
listener.on_update(initial);
|
||||
|
||||
@@ -452,7 +452,7 @@ impl Timeline {
|
||||
let poll_end_event_content = UnstablePollEndEventContent::new(text, poll_start_event_id);
|
||||
let event_content = AnyMessageLikeEventContent::UnstablePollEnd(poll_end_event_content);
|
||||
|
||||
RUNTIME.spawn(async move {
|
||||
get_runtime_handle().spawn(async move {
|
||||
if let Err(err) = self.inner.send(event_content).await {
|
||||
error!("unable to end poll: {err}");
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::{mem::ManuallyDrop, ops::Deref};
|
||||
|
||||
use async_compat::TOKIO1 as RUNTIME;
|
||||
use async_compat::get_runtime_handle;
|
||||
use ruma::{MilliSecondsSinceUnixEpoch, UInt};
|
||||
use tracing::warn;
|
||||
|
||||
@@ -54,7 +54,7 @@ impl<T> AsyncRuntimeDropped<T> {
|
||||
|
||||
impl<T> Drop for AsyncRuntimeDropped<T> {
|
||||
fn drop(&mut self) {
|
||||
let _guard = RUNTIME.enter();
|
||||
let _guard = get_runtime_handle().enter();
|
||||
// SAFETY: self.inner is never used again, which is the only requirement
|
||||
// for ManuallyDrop::drop to be used safely.
|
||||
unsafe {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use async_compat::get_runtime_handle;
|
||||
use language_tags::LanguageTag;
|
||||
use matrix_sdk::{
|
||||
async_trait,
|
||||
@@ -8,7 +9,7 @@ use matrix_sdk::{
|
||||
use ruma::events::MessageLikeEventType;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{room::Room, RUNTIME};
|
||||
use crate::room::Room;
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct WidgetDriverAndHandle {
|
||||
@@ -501,7 +502,7 @@ impl matrix_sdk::widget::CapabilitiesProvider for CapabilitiesProviderWrap {
|
||||
// This could require a prompt to the user. Ideally the callback
|
||||
// interface would just be async, but that's not supported yet so use
|
||||
// one of tokio's blocking task threads instead.
|
||||
RUNTIME
|
||||
get_runtime_handle()
|
||||
.spawn_blocking(move || this.acquire_capabilities(capabilities.into()).into())
|
||||
.await
|
||||
// propagate panics from the blocking task
|
||||
|
||||
Reference in New Issue
Block a user