feat(ffi): add bindings for listening to room send queue updates

Signed-off-by: Johannes Marbach <n0-0ne+github@mailbox.org>
This commit is contained in:
Johannes Marbach
2025-10-08 14:20:35 +02:00
committed by Andy Balaam
parent cb3d281f8f
commit 9b485013e1
2 changed files with 163 additions and 6 deletions

View File

@@ -17,6 +17,8 @@ All notable changes to this project will be documented in this file.
- Add `QrLoginProgress::SyncingSecrets` to indicate that secrets are being synced between the two
devices.
([#5760](https://github.com/matrix-org/matrix-rust-sdk/pull/5760))
- Add `Room::subscribe_to_send_queue_updates` to observe room send queue updates.
([#5761](https://github.com/matrix-org/matrix-rust-sdk/pull/5761))
### Features:

View File

@@ -8,6 +8,7 @@ use matrix_sdk::{
edit::EditedContent, power_levels::RoomPowerLevelChanges, Room as SdkRoom, RoomMemberRole,
TryFromReportedContentScoreError,
},
send_queue::RoomSendQueueUpdate as SdkRoomSendQueueUpdate,
ComposerDraft as SdkComposerDraft, ComposerDraftType as SdkComposerDraftType, EncryptionState,
PredecessorRoom as SdkPredecessorRoom, RoomHero as SdkRoomHero, RoomMemberships, RoomState,
SuccessorRoom as SdkSuccessorRoom,
@@ -25,7 +26,7 @@ use ruma::{
avatar::ImageInfo as RumaAvatarImageInfo,
history_visibility::HistoryVisibility as RumaHistoryVisibility,
join_rules::JoinRule as RumaJoinRule, message::RoomMessageEventContentWithoutRelation,
MediaSource,
MediaSource as RumaMediaSource,
},
AnyMessageLikeEventContent, AnySyncTimelineEvent,
},
@@ -38,17 +39,17 @@ use self::{power_levels::RoomPowerLevels, room_info::RoomInfo};
use crate::{
chunk_iterator::ChunkIterator,
client::{JoinRule, RoomVisibility},
error::{ClientError, MediaInfoError, NotYetImplemented, RoomError},
error::{ClientError, MediaInfoError, NotYetImplemented, QueueWedgeError, RoomError},
event::TimelineEvent,
identity_status_change::IdentityStatusChange,
live_location_share::{LastLocation, LiveLocationShare},
room_member::{RoomMember, RoomMemberWithSenderInfo},
room_preview::RoomPreview,
ruma::{ImageInfo, LocationContent},
ruma::{ImageInfo, LocationContent, MediaSource},
runtime::get_runtime_handle,
timeline::{
configuration::{TimelineConfiguration, TimelineFilter},
EventTimelineItem, LatestEventValue, ReceiptType, SendHandle, Timeline,
AbstractProgress, EventTimelineItem, LatestEventValue, ReceiptType, SendHandle, Timeline,
},
utils::{u64_to_uint, AsyncRuntimeDropped},
TaskHandle,
@@ -735,6 +736,37 @@ impl Room {
self.inner.send_queue().set_enabled(enable);
}
/// Subscribe to all send queue updates in this room.
///
/// The given listener will be immediately called with
/// `RoomSendQueueUpdate::NewLocalEvent` for each local echo existing in
/// the queue.
pub async fn subscribe_to_send_queue_updates(
&self,
listener: Box<dyn SendQueueListener>,
) -> Result<Arc<TaskHandle>, ClientError> {
let q = self.inner.send_queue();
let (local_echoes, mut subscriber) = q.subscribe().await?;
for local_echo in local_echoes {
listener.on_update(RoomSendQueueUpdate::NewLocalEvent {
transaction_id: local_echo.transaction_id.into(),
});
}
Ok(Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
loop {
match subscriber.recv().await {
Ok(update) => match update.try_into() {
Ok(update) => listener.on_update(update),
Err(err) => error!("error when converting send queue update: {err}"),
},
Err(err) => error!("error when listening for send queue updates: {err}"),
}
}
}))))
}
/// Store the given `ComposerDraft` in the state store using the current
/// room id, as identifier.
pub async fn save_composer_draft(
@@ -1381,8 +1413,8 @@ impl TryFrom<ImageInfo> for RumaAvatarImageInfo {
fn try_from(value: ImageInfo) -> Result<Self, MediaInfoError> {
let thumbnail_url = if let Some(media_source) = value.thumbnail_source {
match &media_source.as_ref().media_source {
MediaSource::Plain(mxc_uri) => Some(mxc_uri.clone()),
MediaSource::Encrypted(_) => return Err(MediaInfoError::InvalidField),
RumaMediaSource::Plain(mxc_uri) => Some(mxc_uri.clone()),
RumaMediaSource::Encrypted(_) => return Err(MediaInfoError::InvalidField),
}
} else {
None
@@ -1569,3 +1601,126 @@ impl From<SdkPredecessorRoom> for PredecessorRoom {
Self { room_id: value.room_id.to_string() }
}
}
/// A listener to send queue updates in a specific room.
#[matrix_sdk_ffi_macros::export(callback_interface)]
pub trait SendQueueListener: SyncOutsideWasm + SendOutsideWasm {
/// Called every time the send queue dispatches an update for the given
/// room.
fn on_update(&self, update: RoomSendQueueUpdate);
}
/// An update to a room send queue.
#[derive(uniffi::Enum)]
pub enum RoomSendQueueUpdate {
/// A new local event is being sent.
NewLocalEvent {
/// Transaction id used to identify this event.
transaction_id: String,
},
/// A local event that hadn't been sent to the server yet has been cancelled
/// before sending.
CancelledLocalEvent {
/// Transaction id used to identify this event.
transaction_id: String,
},
/// A local event's content has been replaced with something else.
ReplacedLocalEvent {
/// Transaction id used to identify this event.
transaction_id: String,
},
/// An error happened when an event was being sent.
///
/// The event has not been removed from the queue. All the send queues
/// will be disabled after this happens, and must be manually re-enabled.
SendError {
/// Transaction id used to identify this event.
transaction_id: String,
/// Error received while sending the event.
error: QueueWedgeError,
/// Whether the error is considered recoverable or not.
///
/// An error that's recoverable will disable the room's send queue,
/// while an unrecoverable error will be parked, until the user
/// decides to cancel sending it.
is_recoverable: bool,
},
/// The event has been unwedged and sending is now being retried.
RetryEvent {
/// Transaction id used to identify this event.
transaction_id: String,
},
/// The event has been sent to the server, and the query returned
/// successfully.
SentEvent {
/// Transaction id used to identify this event.
transaction_id: String,
/// Received event id from the send response.
event_id: String,
},
/// A media upload (consisting of a file and possibly a thumbnail) has made
/// progress.
MediaUpload {
/// The media event this uploaded media relates to.
related_to: String,
/// The final media source for the file if it has finished uploading.
file: Option<Arc<MediaSource>>,
/// The index of the media within the transaction. A file and its
/// thumbnail share the same index. Will always be 0 for non-gallery
/// media uploads.
index: u64,
/// The combined upload progress across the file and, if existing, its
/// thumbnail. For gallery uploads, the progress is reported per indexed
/// gallery item.
progress: AbstractProgress,
},
}
impl TryFrom<SdkRoomSendQueueUpdate> for RoomSendQueueUpdate {
type Error = ClientError;
fn try_from(value: SdkRoomSendQueueUpdate) -> std::result::Result<Self, Self::Error> {
Ok(match value {
SdkRoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
Self::CancelledLocalEvent { transaction_id: transaction_id.into() }
}
SdkRoomSendQueueUpdate::MediaUpload { related_to, file, index, progress } => {
Self::MediaUpload {
related_to: related_to.into(),
file: file.map(|source| source.try_into().map(Arc::new)).transpose()?,
index,
progress: progress.into(),
}
}
SdkRoomSendQueueUpdate::NewLocalEvent(local_echo) => {
Self::NewLocalEvent { transaction_id: local_echo.transaction_id.into() }
}
SdkRoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, .. } => {
Self::ReplacedLocalEvent { transaction_id: transaction_id.into() }
}
SdkRoomSendQueueUpdate::RetryEvent { transaction_id } => {
Self::RetryEvent { transaction_id: transaction_id.into() }
}
SdkRoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
let as_queue_wedge_error: matrix_sdk::QueueWedgeError = (&*error).into();
Self::SendError {
transaction_id: transaction_id.into(),
error: as_queue_wedge_error.into(),
is_recoverable,
}
}
SdkRoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
Self::SentEvent { transaction_id: transaction_id.into(), event_id: event_id.into() }
}
})
}
}