diff --git a/bindings/matrix-sdk-ffi/CHANGELOG.md b/bindings/matrix-sdk-ffi/CHANGELOG.md index 18f13a504..8cb468244 100644 --- a/bindings/matrix-sdk-ffi/CHANGELOG.md +++ b/bindings/matrix-sdk-ffi/CHANGELOG.md @@ -17,6 +17,8 @@ All notable changes to this project will be documented in this file. - Add `QrLoginProgress::SyncingSecrets` to indicate that secrets are being synced between the two devices. ([#5760](https://github.com/matrix-org/matrix-rust-sdk/pull/5760)) +- Add `Room::subscribe_to_send_queue_updates` to observe room send queue updates. + ([#5761](https://github.com/matrix-org/matrix-rust-sdk/pull/5761)) ### Features: diff --git a/bindings/matrix-sdk-ffi/src/room/mod.rs b/bindings/matrix-sdk-ffi/src/room/mod.rs index 8f5800247..e12e5a679 100644 --- a/bindings/matrix-sdk-ffi/src/room/mod.rs +++ b/bindings/matrix-sdk-ffi/src/room/mod.rs @@ -8,6 +8,7 @@ use matrix_sdk::{ edit::EditedContent, power_levels::RoomPowerLevelChanges, Room as SdkRoom, RoomMemberRole, TryFromReportedContentScoreError, }, + send_queue::RoomSendQueueUpdate as SdkRoomSendQueueUpdate, ComposerDraft as SdkComposerDraft, ComposerDraftType as SdkComposerDraftType, EncryptionState, PredecessorRoom as SdkPredecessorRoom, RoomHero as SdkRoomHero, RoomMemberships, RoomState, SuccessorRoom as SdkSuccessorRoom, @@ -25,7 +26,7 @@ use ruma::{ avatar::ImageInfo as RumaAvatarImageInfo, history_visibility::HistoryVisibility as RumaHistoryVisibility, join_rules::JoinRule as RumaJoinRule, message::RoomMessageEventContentWithoutRelation, - MediaSource, + MediaSource as RumaMediaSource, }, AnyMessageLikeEventContent, AnySyncTimelineEvent, }, @@ -38,17 +39,17 @@ use self::{power_levels::RoomPowerLevels, room_info::RoomInfo}; use crate::{ chunk_iterator::ChunkIterator, client::{JoinRule, RoomVisibility}, - error::{ClientError, MediaInfoError, NotYetImplemented, RoomError}, + error::{ClientError, MediaInfoError, NotYetImplemented, QueueWedgeError, RoomError}, event::TimelineEvent, identity_status_change::IdentityStatusChange, live_location_share::{LastLocation, LiveLocationShare}, room_member::{RoomMember, RoomMemberWithSenderInfo}, room_preview::RoomPreview, - ruma::{ImageInfo, LocationContent}, + ruma::{ImageInfo, LocationContent, MediaSource}, runtime::get_runtime_handle, timeline::{ configuration::{TimelineConfiguration, TimelineFilter}, - EventTimelineItem, LatestEventValue, ReceiptType, SendHandle, Timeline, + AbstractProgress, EventTimelineItem, LatestEventValue, ReceiptType, SendHandle, Timeline, }, utils::{u64_to_uint, AsyncRuntimeDropped}, TaskHandle, @@ -735,6 +736,37 @@ impl Room { self.inner.send_queue().set_enabled(enable); } + /// Subscribe to all send queue updates in this room. + /// + /// The given listener will be immediately called with + /// `RoomSendQueueUpdate::NewLocalEvent` for each local echo existing in + /// the queue. + pub async fn subscribe_to_send_queue_updates( + &self, + listener: Box, + ) -> Result, ClientError> { + let q = self.inner.send_queue(); + let (local_echoes, mut subscriber) = q.subscribe().await?; + + for local_echo in local_echoes { + listener.on_update(RoomSendQueueUpdate::NewLocalEvent { + transaction_id: local_echo.transaction_id.into(), + }); + } + + Ok(Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { + loop { + match subscriber.recv().await { + Ok(update) => match update.try_into() { + Ok(update) => listener.on_update(update), + Err(err) => error!("error when converting send queue update: {err}"), + }, + Err(err) => error!("error when listening for send queue updates: {err}"), + } + } + })))) + } + /// Store the given `ComposerDraft` in the state store using the current /// room id, as identifier. pub async fn save_composer_draft( @@ -1381,8 +1413,8 @@ impl TryFrom for RumaAvatarImageInfo { fn try_from(value: ImageInfo) -> Result { let thumbnail_url = if let Some(media_source) = value.thumbnail_source { match &media_source.as_ref().media_source { - MediaSource::Plain(mxc_uri) => Some(mxc_uri.clone()), - MediaSource::Encrypted(_) => return Err(MediaInfoError::InvalidField), + RumaMediaSource::Plain(mxc_uri) => Some(mxc_uri.clone()), + RumaMediaSource::Encrypted(_) => return Err(MediaInfoError::InvalidField), } } else { None @@ -1569,3 +1601,126 @@ impl From for PredecessorRoom { Self { room_id: value.room_id.to_string() } } } + +/// A listener to send queue updates in a specific room. +#[matrix_sdk_ffi_macros::export(callback_interface)] +pub trait SendQueueListener: SyncOutsideWasm + SendOutsideWasm { + /// Called every time the send queue dispatches an update for the given + /// room. + fn on_update(&self, update: RoomSendQueueUpdate); +} + +/// An update to a room send queue. +#[derive(uniffi::Enum)] +pub enum RoomSendQueueUpdate { + /// A new local event is being sent. + NewLocalEvent { + /// Transaction id used to identify this event. + transaction_id: String, + }, + + /// A local event that hadn't been sent to the server yet has been cancelled + /// before sending. + CancelledLocalEvent { + /// Transaction id used to identify this event. + transaction_id: String, + }, + + /// A local event's content has been replaced with something else. + ReplacedLocalEvent { + /// Transaction id used to identify this event. + transaction_id: String, + }, + + /// An error happened when an event was being sent. + /// + /// The event has not been removed from the queue. All the send queues + /// will be disabled after this happens, and must be manually re-enabled. + SendError { + /// Transaction id used to identify this event. + transaction_id: String, + /// Error received while sending the event. + error: QueueWedgeError, + /// Whether the error is considered recoverable or not. + /// + /// An error that's recoverable will disable the room's send queue, + /// while an unrecoverable error will be parked, until the user + /// decides to cancel sending it. + is_recoverable: bool, + }, + + /// The event has been unwedged and sending is now being retried. + RetryEvent { + /// Transaction id used to identify this event. + transaction_id: String, + }, + + /// The event has been sent to the server, and the query returned + /// successfully. + SentEvent { + /// Transaction id used to identify this event. + transaction_id: String, + /// Received event id from the send response. + event_id: String, + }, + + /// A media upload (consisting of a file and possibly a thumbnail) has made + /// progress. + MediaUpload { + /// The media event this uploaded media relates to. + related_to: String, + + /// The final media source for the file if it has finished uploading. + file: Option>, + + /// The index of the media within the transaction. A file and its + /// thumbnail share the same index. Will always be 0 for non-gallery + /// media uploads. + index: u64, + + /// The combined upload progress across the file and, if existing, its + /// thumbnail. For gallery uploads, the progress is reported per indexed + /// gallery item. + progress: AbstractProgress, + }, +} + +impl TryFrom for RoomSendQueueUpdate { + type Error = ClientError; + + fn try_from(value: SdkRoomSendQueueUpdate) -> std::result::Result { + Ok(match value { + SdkRoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => { + Self::CancelledLocalEvent { transaction_id: transaction_id.into() } + } + SdkRoomSendQueueUpdate::MediaUpload { related_to, file, index, progress } => { + Self::MediaUpload { + related_to: related_to.into(), + file: file.map(|source| source.try_into().map(Arc::new)).transpose()?, + index, + progress: progress.into(), + } + } + SdkRoomSendQueueUpdate::NewLocalEvent(local_echo) => { + Self::NewLocalEvent { transaction_id: local_echo.transaction_id.into() } + } + SdkRoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, .. } => { + Self::ReplacedLocalEvent { transaction_id: transaction_id.into() } + } + SdkRoomSendQueueUpdate::RetryEvent { transaction_id } => { + Self::RetryEvent { transaction_id: transaction_id.into() } + } + SdkRoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => { + let as_queue_wedge_error: matrix_sdk::QueueWedgeError = (&*error).into(); + Self::SendError { + transaction_id: transaction_id.into(), + error: as_queue_wedge_error.into(), + is_recoverable, + } + } + SdkRoomSendQueueUpdate::SentEvent { transaction_id, event_id } => { + Self::SentEvent { transaction_id: transaction_id.into(), event_id: event_id.into() } + } + }) + } +}