From d665ab238e2fa7735fbd0a126cbaaaba00c69fe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jorge=20Mart=C3=ADn?= Date: Tue, 22 Jul 2025 13:08:24 +0200 Subject: [PATCH] refactor: Make `SendMediaUploadRequest` a builder, allowing to create a media upload request with a more flexible API. Use `SendMediaUploadRequest` in `Media::upload` --- bindings/matrix-sdk-ffi/src/client.rs | 12 +-- crates/matrix-sdk/src/account.rs | 7 +- crates/matrix-sdk/src/client/futures.rs | 65 +++++++++++++--- crates/matrix-sdk/src/client/mod.rs | 19 ++--- crates/matrix-sdk/src/encryption/futures.rs | 15 ++-- crates/matrix-sdk/src/lib.rs | 3 + crates/matrix-sdk/src/media.rs | 75 ++++++++----------- crates/matrix-sdk/src/room/mod.rs | 16 +++- crates/matrix-sdk/src/send_queue/mod.rs | 27 ++++--- crates/matrix-sdk/src/test_utils/mocks/mod.rs | 8 +- 10 files changed, 151 insertions(+), 96 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 2f832a88d..0a9394427 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -39,8 +39,8 @@ use matrix_sdk::{ }, sliding_sync::Version as SdkSlidingSyncVersion, store::RoomLoadSettings as SdkRoomLoadSettings, - AuthApi, AuthSession, Client as MatrixClient, SessionChange, SessionTokens, - STATE_STORE_DATABASE_NAME, + AuthApi, AuthSession, Client as MatrixClient, SendMediaUploadRequest, SessionChange, + SessionTokens, STATE_STORE_DATABASE_NAME, }; use matrix_sdk_common::{stream::StreamExt, SendOutsideWasm, SyncOutsideWasm}; use matrix_sdk_ui::{ @@ -1030,10 +1030,12 @@ impl Client { progress_watcher: Option>, ) -> Result { let mime_type: mime::Mime = mime_type.parse().context("Parsing mime type")?; - let request = self.inner.media().upload(&mime_type, data, filename, None); + let send_media_request = SendMediaUploadRequest::new((*self.inner).clone(), data) + .with_content_type(mime_type.essence_str().to_owned()) + .with_filename(filename); if let Some(progress_watcher) = progress_watcher { - let mut subscriber = request.subscribe_to_send_progress(); + let mut subscriber = send_media_request.subscribe_to_send_progress(); get_runtime_handle().spawn(async move { while let Some(progress) = subscriber.next().await { progress_watcher.transmission_progress(progress.into()); @@ -1041,7 +1043,7 @@ impl Client { }); } - let response = request.await?; + let response = self.inner.media().upload(send_media_request).await?; Ok(String::from(response.content_uri)) } diff --git a/crates/matrix-sdk/src/account.rs b/crates/matrix-sdk/src/account.rs index 3bebbab13..68bd995d1 100644 --- a/crates/matrix-sdk/src/account.rs +++ b/crates/matrix-sdk/src/account.rs @@ -55,7 +55,7 @@ use ruma::{ use serde::Deserialize; use tracing::error; -use crate::{config::RequestConfig, Client, Error, Result}; +use crate::{config::RequestConfig, Client, Error, Result, SendMediaUploadRequest}; /// A high-level API to manage the client owner's account. /// @@ -264,7 +264,10 @@ impl Account { /// /// [`Media::upload()`]: crate::Media::upload pub async fn upload_avatar(&self, content_type: &Mime, data: Vec) -> Result { - let upload_response = self.client.media().upload(content_type, data, None, None).await?; + let send_media_request = SendMediaUploadRequest::new(self.client.clone(), data) + .with_content_type(content_type.essence_str().to_owned()); + + let upload_response = self.client.media().upload(send_media_request).await?; self.set_avatar_url(Some(&upload_response.content_uri)).await?; Ok(upload_response.content_uri) } diff --git a/crates/matrix-sdk/src/client/futures.rs b/crates/matrix-sdk/src/client/futures.rs index e83997b16..3953f115a 100644 --- a/crates/matrix-sdk/src/client/futures.rs +++ b/crates/matrix-sdk/src/client/futures.rs @@ -156,17 +156,50 @@ where } } -/// `IntoFuture` used to send media upload requests. It wraps another -/// [`SendRequest`], checking its size will be accepted by the homeserver before -/// uploading. +/// `IntoFuture` used to send media upload requests. It works as a builder which +/// can create a [`SendRequest`] given several configuration options, checking +/// its size will be accepted by the homeserver before uploading. #[allow(missing_debug_implementations)] pub struct SendMediaUploadRequest { - send_request: SendRequest, + client: Client, + request: media::create_content::v3::Request, + /// The [`RequestConfig`] to use for uploading the media file. + pub request_config: Option, + progress_observable: SharedObservable, } impl SendMediaUploadRequest { - pub fn new(request: SendRequest) -> Self { - Self { send_request: request } + /// Creates a new instance. + pub fn new(client: Client, file: Vec) -> Self { + Self { + client, + request: media::create_content::v3::Request::new(file), + request_config: Default::default(), + progress_observable: SharedObservable::new(TransmissionProgress::default()), + } + } + + /// Returns the data to upload. + pub fn data(&self) -> &Vec { + &self.request.file + } + + /// Sets the content type of the media for the media upload request. + pub fn with_content_type(mut self, content_type: impl Into) -> Self { + self.request.content_type = Some(content_type.into()); + self + } + + /// Sets the filename for the media upload request. + pub fn with_filename(mut self, filename: Option>) -> Self { + self.request.filename = filename.map(Into::into); + self + } + + /// Applies the provided [`RequestConfig`] to the future [`SendRequest`]. + pub fn with_request_config(mut self, request_config: Option) -> Self { + self.request_config = request_config; + self } /// Replace the default `SharedObservable` used for tracking upload @@ -179,14 +212,24 @@ impl SendMediaUploadRequest { mut self, send_progress: SharedObservable, ) -> Self { - self.send_request = self.send_request.with_send_progress_observable(send_progress); + self.progress_observable = send_progress; self } /// Get a subscriber to observe the progress of sending the request /// body. pub fn subscribe_to_send_progress(&self) -> Subscriber { - self.send_request.send_progress.subscribe() + self.progress_observable.subscribe() + } + + /// Creates the [`SendRequest`] using the provided parameters. + pub fn build_send_request(self) -> SendRequest { + SendRequest { + client: self.client, + request: self.request, + config: self.request_config, + send_progress: self.progress_observable, + } } } @@ -195,9 +238,9 @@ impl IntoFuture for SendMediaUploadRequest { boxed_into_future!(); fn into_future(self) -> Self::IntoFuture { - let request_length = self.send_request.request.file.len(); - let client = self.send_request.client.clone(); - let send_request = self.send_request; + let send_request = self.build_send_request(); + let request_length = send_request.request.file.len(); + let client = send_request.client.clone(); Box::pin(async move { let max_upload_size = client.load_or_fetch_max_upload_size().await?; diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index cdf9a7917..5b204a4e1 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -111,7 +111,10 @@ mod builder; pub(crate) mod caches; pub(crate) mod futures; -pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder}; +pub use self::{ + builder::{sanitize_server_name, ClientBuildError, ClientBuilder}, + futures::SendMediaUploadRequest, +}; #[cfg(not(target_family = "wasm"))] type NotificationHandlerFut = Pin + Send>>; @@ -2909,7 +2912,6 @@ pub(crate) mod tests { use assert_matches::assert_matches; use assert_matches2::assert_let; - use eyeball::SharedObservable; use futures_util::{pin_mut, FutureExt}; use js_int::{uint, UInt}; use matrix_sdk_base::{ @@ -2950,10 +2952,9 @@ pub(crate) mod tests { use crate::{ client::{futures::SendMediaUploadRequest, WeakClient}, config::RequestConfig, - futures::SendRequest, media::MediaError, test_utils::{client::MockClientBuilder, mocks::MatrixMockServer}, - Error, TransmissionProgress, + Error, }; #[async_test] @@ -3781,15 +3782,7 @@ pub(crate) mod tests { assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1)); let data = vec![1, 2]; - let upload_request = - ruma::api::client::media::create_content::v3::Request::new(data.clone()); - let request = SendRequest { - client: client.clone(), - request: upload_request, - config: None, - send_progress: SharedObservable::new(TransmissionProgress::default()), - }; - let media_request = SendMediaUploadRequest::new(request); + let media_request = SendMediaUploadRequest::new(client, data.clone()); let error = media_request.await.err(); assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error); diff --git a/crates/matrix-sdk/src/encryption/futures.rs b/crates/matrix-sdk/src/encryption/futures.rs index 763117a64..7a327bd60 100644 --- a/crates/matrix-sdk/src/encryption/futures.rs +++ b/crates/matrix-sdk/src/encryption/futures.rs @@ -23,7 +23,9 @@ use eyeball::{SharedObservable, Subscriber}; use matrix_sdk_common::boxed_into_future; use ruma::events::room::{EncryptedFile, EncryptedFileInit}; -use crate::{config::RequestConfig, Client, Media, Result, TransmissionProgress}; +use crate::{ + config::RequestConfig, Client, Media, Result, SendMediaUploadRequest, TransmissionProgress, +}; /// Future returned by [`Client::upload_encrypted_file`]. #[allow(missing_debug_implementations)] @@ -89,11 +91,12 @@ where let request_config = request_config.map(|config| config.timeout(Media::reasonable_upload_timeout(&buf))); - let response = client - .media() - .upload(&mime::APPLICATION_OCTET_STREAM, buf, None, request_config) - .with_send_progress_observable(send_progress) - .await?; + let send_media_request = SendMediaUploadRequest::new(self.client.clone(), buf) + .with_content_type(mime::APPLICATION_OCTET_STREAM.essence_str()) + .with_request_config(request_config) + .with_send_progress_observable(send_progress); + + let response = client.media().upload(send_media_request).await?; let file: EncryptedFile = { let keys = encryptor.finish(); diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index 2680d4cb1..0e8e3a59a 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -94,6 +94,9 @@ pub use sliding_sync::{ uniffi::setup_scaffolding!(); pub mod live_location_share; + +pub use client::SendMediaUploadRequest; + #[cfg(any(test, feature = "testing"))] pub mod test_utils; diff --git a/crates/matrix-sdk/src/media.rs b/crates/matrix-sdk/src/media.rs index 8bc0a4f84..52d79c722 100644 --- a/crates/matrix-sdk/src/media.rs +++ b/crates/matrix-sdk/src/media.rs @@ -41,8 +41,8 @@ use tempfile::{Builder as TempFileBuilder, NamedTempFile, TempDir}; use tokio::{fs::File as TokioFile, io::AsyncWriteExt}; use crate::{ - attachment::Thumbnail, client::futures::SendMediaUploadRequest, config::RequestConfig, Client, - Error, Result, TransmissionProgress, + attachment::Thumbnail, client::futures::SendMediaUploadRequest, Client, Error, Result, + TransmissionProgress, }; /// A conservative upload speed of 1Mbps @@ -165,24 +165,14 @@ impl Media { Self { client } } - /// Upload some media to the server. - /// - /// # Arguments - /// - /// * `content_type` - The type of the media, this will be used as the - /// content-type header. - /// - /// * `data` - Vector of bytes to be uploaded to the server. - /// - /// * `request_config` - Optional request configuration for the HTTP client, - /// overriding the default. If not provided, a reasonable timeout value is - /// inferred. + /// Upload some media to the server using the provided + /// [`SendMediaUploadRequest`]. /// /// # Examples /// /// ```no_run /// # use std::fs; - /// # use matrix_sdk::{Client, ruma::room_id}; + /// # use matrix_sdk::{Client, ruma::room_id, SendMediaUploadRequest}; /// # use url::Url; /// # use mime; /// # async { @@ -190,32 +180,25 @@ impl Media { /// # let mut client = Client::new(homeserver).await?; /// let image = fs::read("/home/example/my-cat.jpg")?; /// - /// let response = client - /// .media() - /// .upload(&mime::IMAGE_JPEG, image, Some("my-cat.jpg".to_string()), None) - /// .await?; + /// let send_media_request = SendMediaUploadRequest::new(client.clone(), image) + /// .with_content_type(mime::IMAGE_JPEG.essence_str().to_string()) + /// .with_filename(Some("my-cat.jpg")); + /// + /// let response = client.media().upload(send_media_request).await?; /// /// println!("Cat URI: {}", response.content_uri); /// # anyhow::Ok(()) }; /// ``` - pub fn upload( + pub async fn upload( &self, - content_type: &Mime, - data: Vec, - filename: Option, - request_config: Option, - ) -> SendMediaUploadRequest { - let request_config = request_config.unwrap_or_else(|| { - self.client.request_config().timeout(Self::reasonable_upload_timeout(&data)) + send_media_upload_request: SendMediaUploadRequest, + ) -> Result { + let request_config = send_media_upload_request.request_config.unwrap_or_else(|| { + self.client + .request_config() + .timeout(Self::reasonable_upload_timeout(send_media_upload_request.data())) }); - - let request = assign!(media::create_content::v3::Request::new(data), { - filename, - content_type: Some(content_type.essence_str().to_owned()), - }); - - let request = self.client.send(request).with_request_config(request_config); - SendMediaUploadRequest::new(request) + send_media_upload_request.with_request_config(Some(request_config)).await } /// Returns a reasonable upload timeout for an upload, based on the size of @@ -732,11 +715,12 @@ impl Media { let upload_thumbnail = self.upload_thumbnail(thumbnail, filename.clone(), send_progress.clone()); - let upload_attachment = async move { - self.upload(content_type, data, filename, None) - .with_send_progress_observable(send_progress) - .await - }; + let send_media_request = SendMediaUploadRequest::new(self.client.clone(), data) + .with_content_type(content_type.essence_str().to_owned()) + .with_filename(filename) + .with_send_progress_observable(send_progress); + + let upload_attachment = async move { self.upload(send_media_request).await }; let (thumbnail, response) = try_join(upload_thumbnail, upload_attachment).await?; @@ -758,10 +742,13 @@ impl Media { let (data, content_type, thumbnail_info) = thumbnail.into_parts(); let filename = filename.map(|name| format!("thumbnail-{name}")); - let response = self - .upload(&content_type, data, filename, None) - .with_send_progress_observable(send_progress) - .await?; + + let send_media_request = SendMediaUploadRequest::new(self.client.clone(), data) + .with_content_type(content_type.essence_str().to_owned()) + .with_filename(filename) + .with_send_progress_observable(send_progress); + + let response = self.upload(send_media_request).await?; let url = response.content_uri; Ok(Some((MediaSource::Plain(url), thumbnail_info))) diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 782f1b9eb..322295eda 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -154,7 +154,8 @@ use crate::{ }, sync::RoomUpdate, utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent}, - BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress, + BaseRoom, Client, Error, HttpResult, Result, RoomState, SendMediaUploadRequest, + TransmissionProgress, }; #[cfg(feature = "e2e-encryption")] use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState}; @@ -2258,7 +2259,13 @@ impl Room { let (media_source, thumbnail) = self .client .media() - .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress) + .upload_plain_media_and_thumbnail( + content_type, + data.clone(), + Some(filename.clone()), + thumbnail, + send_progress, + ) .await?; if store_in_cache { @@ -2515,7 +2522,10 @@ impl Room { ) -> Result { self.ensure_room_joined()?; - let upload_response = self.client.media().upload(mime, data, None, None).await?; + let send_media_request = SendMediaUploadRequest::new(self.client.clone(), data) + .with_content_type(mime.essence_str().to_owned()); + + let upload_response = self.client.media().upload(send_media_request).await?; let mut info = info.unwrap_or_default(); info.blurhash = upload_response.blurhash; info.mimetype = Some(mime.to_string()); diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 95cd02ed0..e6d576d08 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -142,7 +142,7 @@ use as_variant::as_variant; use matrix_sdk_base::store::FinishGalleryItemInfo; use matrix_sdk_base::{ event_cache::store::EventCacheStoreError, - media::MediaRequestParameters, + media::{MediaEventContent, MediaRequestParameters}, store::{ ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore, FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind, @@ -169,7 +169,7 @@ use ruma::{ }; use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard}; use tracing::{debug, error, info, instrument, trace, warn}; -use matrix_sdk_base::media::MediaEventContent; + #[cfg(feature = "e2e-encryption")] use crate::crypto::{OlmError, SessionRecipientCollectionError}; use crate::{ @@ -177,7 +177,7 @@ use crate::{ config::RequestConfig, error::RetryKind, room::{edit::EditedContent, WeakRoom}, - Client, Media, Room, + Client, Media, Room, SendMediaUploadRequest, }; mod upload; @@ -789,11 +789,13 @@ impl RoomSendQueue { trace!("upload will be in clear text (room without encryption)"); let request_config = RequestConfig::short_retry() .timeout(Media::reasonable_upload_timeout(&data)); - let res = room - .client() - .media() - .upload(&mime, data, filename, Some(request_config)) - .await?; + + let send_media_request = SendMediaUploadRequest::new(room.client(), data) + .with_content_type(mime.essence_str()) + .with_request_config(Some(request_config)) + .with_filename(filename); + + let res = room.client().media().upload(send_media_request).await?; MediaSource::Plain(res.content_uri) }; @@ -801,8 +803,13 @@ impl RoomSendQueue { let media_source = { let request_config = RequestConfig::short_retry() .timeout(Media::reasonable_upload_timeout(&data)); - let res = - room.client().media().upload(&mime, data, Some(request_config)).await?; + + let send_media_request = SendMediaUploadRequest::new(room.client(), data) + .with_content_type(mime.essence_str()) + .with_request_config(Some(request_config)) + .with_filename(filename); + + let res = room.client().media().upload(send_media_request).await?; MediaSource::Plain(res.content_uri) }; diff --git a/crates/matrix-sdk/src/test_utils/mocks/mod.rs b/crates/matrix-sdk/src/test_utils/mocks/mod.rs index 1baa80650..83f9c8b90 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/mod.rs @@ -2583,7 +2583,8 @@ impl<'a> MockEndpoint<'a, UploadEndpoint> { /// # Examples /// /// ```no_run - /// # tokio_test::block_on(async { + /// # use matrix_sdk::SendMediaUploadRequest; + /// tokio_test::block_on(async { /// use matrix_sdk::{ /// ruma::{event_id, mxc_uri, room_id}, /// test_utils::mocks::MatrixMockServer, @@ -2595,7 +2596,10 @@ impl<'a> MockEndpoint<'a, UploadEndpoint> { /// let (receiver, upload_mock) = server.mock_upload().ok_with_capture(mxid); /// let client = server.client_builder().build().await; /// - /// client.media().upload(&mime::TEXT_PLAIN, vec![1, 2, 3, 4, 5], None).await?; + /// let send_media_request = SendMediaUploadRequest::new(client.clone(), vec![1, 2, 3, 4, 5]) + /// .with_content_type(mime::TEXT_PLAIN.essence_str()); + /// + /// client.media().upload(send_media_request).await?; /// /// let uploaded = receiver.await?; ///