fix(wasm): don't use tokio::time::{timeout,sleep} (#4573)

Tokio timeout and sleep don't work on wasm so provide alternative versions

---------

Signed-off-by: Manmeet Singh <manmeetmann2003@gmail.com>
Signed-off-by: Andy Balaam <andy.balaam@matrix.org>
Co-authored-by: Andy Balaam <andy.balaam@matrix.org>
This commit is contained in:
maan2003
2025-01-23 14:27:11 +05:30
committed by GitHub
parent 2d0f873342
commit 4c4dd03411
13 changed files with 75 additions and 31 deletions

View File

@@ -28,6 +28,7 @@ pub mod failures_cache;
pub mod linked_chunk;
pub mod locks;
pub mod ring_buffer;
pub mod sleep;
pub mod store_locks;
pub mod timeout;
pub mod tracing_timer;

View File

@@ -0,0 +1,49 @@
// Copyright 2024 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
/// Sleep for the specified duration.
///
/// This is a cross-platform sleep implementation that works on both wasm32 and
/// non-wasm32 targets.
pub async fn sleep(duration: Duration) {
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(duration).await;
#[cfg(target_arch = "wasm32")]
gloo_timers::future::TimeoutFuture::new(u32::try_from(duration.as_millis()).unwrap_or_else(
|_| {
tracing::error!("Sleep duration too long, sleeping for u32::MAX ms");
u32::MAX
},
))
.await;
}
#[cfg(test)]
mod tests {
use matrix_sdk_test_macros::async_test;
use super::*;
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
#[async_test]
async fn test_sleep() {
// Just test that it doesn't panic
sleep(Duration::from_millis(1)).await;
}
}

View File

@@ -46,11 +46,12 @@ use std::{
time::Duration,
};
use tokio::{sync::Mutex, time::sleep};
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, trace};
use crate::{
executor::{spawn, JoinHandle},
sleep::sleep,
SendOutsideWasm,
};

View File

@@ -41,7 +41,7 @@ impl Error for ElapsedError {}
/// an error.
pub async fn timeout<F, T>(future: F, duration: Duration) -> Result<T, ElapsedError>
where
F: Future<Output = T> + Unpin,
F: Future<Output = T>,
{
#[cfg(not(target_arch = "wasm32"))]
return tokio_timeout(duration, future).await.map_err(|_| ElapsedError(()));
@@ -51,7 +51,7 @@ where
let timeout_future =
TimeoutFuture::new(u32::try_from(duration.as_millis()).expect("Overlong duration"));
match select(future, timeout_future).await {
match select(std::pin::pin!(future), timeout_future).await {
Either::Left((res, _)) => Ok(res),
Either::Right((_, _)) => Err(ElapsedError(())),
}

View File

@@ -31,7 +31,7 @@ use std::{pin::Pin, time::Duration};
use async_stream::stream;
use futures_core::stream::Stream;
use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{Client, SlidingSync, LEASE_DURATION_MS};
use matrix_sdk::{sleep::sleep, Client, SlidingSync, LEASE_DURATION_MS};
use matrix_sdk_base::sliding_sync::http;
use ruma::assign;
use tokio::sync::OwnedMutexGuard;
@@ -174,7 +174,7 @@ impl EncryptionSyncService {
LEASE_DURATION_MS
);
tokio::time::sleep(Duration::from_millis(LEASE_DURATION_MS.into())).await;
sleep(Duration::from_millis(LEASE_DURATION_MS.into())).await;
lock_guard = self
.client

View File

@@ -18,7 +18,9 @@ use std::{
};
use futures_util::{pin_mut, StreamExt as _};
use matrix_sdk::{room::Room, Client, ClientBuildError, SlidingSyncList, SlidingSyncMode};
use matrix_sdk::{
room::Room, sleep::sleep, Client, ClientBuildError, SlidingSyncList, SlidingSyncMode,
};
use matrix_sdk_base::{
deserialized_responses::TimelineEvent, sliding_sync::http, RoomState, StoreError,
};
@@ -212,7 +214,7 @@ impl NotificationClient {
for _ in 0..3 {
trace!("waiting for decryption…");
tokio::time::sleep(Duration::from_millis(wait)).await;
sleep(Duration::from_millis(wait)).await;
let new_event = room.decrypt_event(raw_event.cast_ref()).await?;

View File

@@ -63,8 +63,8 @@ use async_stream::stream;
use eyeball::Subscriber;
use futures_util::{pin_mut, Stream, StreamExt};
use matrix_sdk::{
event_cache::EventCacheError, Client, Error as SlidingSyncError, SlidingSync, SlidingSyncList,
SlidingSyncMode,
event_cache::EventCacheError, timeout::timeout, Client, Error as SlidingSyncError, SlidingSync,
SlidingSyncList, SlidingSyncMode,
};
use matrix_sdk_base::sliding_sync::http;
pub use room::*;
@@ -72,7 +72,6 @@ pub use room_list::*;
use ruma::{assign, directory::RoomTypeFilter, events::StateEventType, OwnedRoomId, RoomId, UInt};
pub use state::*;
use thiserror::Error;
use tokio::time::timeout;
use tracing::debug;
use crate::timeline;
@@ -328,7 +327,7 @@ impl RoomListService {
};
// `state.next().await` has a maximum of `yield_delay` time to execute…
let next_state = match timeout(yield_delay, state.next()).await {
let next_state = match timeout(state.next(), yield_delay).await {
// A new state has been received before `yield_delay` time. The new
// `sync_indicator` value won't be yielded.
Ok(next_state) => next_state,

View File

@@ -27,6 +27,7 @@ use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
use matrix_sdk::{
crypto::types::events::UtdCause,
executor::{spawn, JoinHandle},
sleep::sleep,
Client,
};
use matrix_sdk_base::{StateStoreDataKey, StateStoreDataValue, StoreError};
@@ -34,10 +35,7 @@ use ruma::{
time::{Duration, Instant},
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedServerName, UserId,
};
use tokio::{
sync::{Mutex as AsyncMutex, MutexGuard},
time::sleep,
};
use tokio::sync::{Mutex as AsyncMutex, MutexGuard};
use tracing::error;
/// A generic interface which methods get called whenever we observe a

View File

@@ -692,12 +692,9 @@ impl Backups {
.upload_progress
.set(UploadState::Uploading(new_counts));
#[cfg(not(target_arch = "wasm32"))]
{
let delay =
self.client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned();
tokio::time::sleep(delay).await;
}
let delay =
self.client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned();
crate::sleep::sleep(delay).await;
Ok(())
}

View File

@@ -216,7 +216,7 @@ impl BackupDownloadTask {
) {
// Wait a bit, perhaps the room key will arrive in the meantime.
#[cfg(not(test))]
tokio::time::sleep(Duration::from_millis(Self::DOWNLOAD_DELAY_MILLIS)).await;
crate::sleep::sleep(Duration::from_millis(Self::DOWNLOAD_DELAY_MILLIS)).await;
// Now take the lock, and check that we still want to do a download. If we do,
// keep hold of a strong reference to the `Client`.

View File

@@ -17,9 +17,8 @@
use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration};
use eyeball::Subscriber;
use matrix_sdk_base::deserialized_responses::TimelineEvent;
use matrix_sdk_base::{deserialized_responses::TimelineEvent, timeout::timeout};
use matrix_sdk_common::linked_chunk::ChunkContent;
use tokio::time::timeout;
use tracing::{debug, instrument, trace};
use super::{
@@ -295,7 +294,7 @@ impl RoomPagination {
// Otherwise, wait for a notification that we received a previous-batch token.
// Note the state lock is released while doing so, allowing other tasks to write
// into the linked chunk.
let _ = timeout(wait_time, self.inner.pagination_batch_token_notifier.notified()).await;
let _ = timeout(self.inner.pagination_batch_token_notifier.notified(), wait_time).await;
let mut state = self.inner.state.write().await;

View File

@@ -23,6 +23,7 @@ use std::{
pub use matrix_sdk_base::sync::*;
use matrix_sdk_base::{
debug::{DebugInvitedRoom, DebugKnockedRoom, DebugListOfRawEventsNoId},
sleep::sleep,
sync::SyncResponse as BaseSyncResponse,
};
use ruma::{
@@ -299,11 +300,7 @@ impl Client {
}
async fn sleep() {
#[cfg(target_arch = "wasm32")]
gloo_timers::future::TimeoutFuture::new(1_000).await;
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;
}
pub(crate) async fn sync_loop_helper(

View File

@@ -25,6 +25,7 @@ use matrix_sdk::{
events::room::message::{MessageType, RoomMessageEventContent},
MilliSecondsSinceUnixEpoch, OwnedRoomId, RoomId,
},
sleep::sleep,
AuthSession, Client, ServerName, SqliteCryptoStore, SqliteEventCacheStore, SqliteStateStore,
};
use matrix_sdk_ui::{
@@ -332,7 +333,7 @@ impl App {
let message = self.last_status_message.clone();
self.clear_status_message = Some(spawn(async move {
// Clear the status message in 4 seconds.
tokio::time::sleep(Duration::from_secs(4)).await;
sleep(Duration::from_secs(4)).await;
*message.lock().unwrap() = None;
}));