Merge pull request #4571 from zecakeh/media-retention-policy

feat: Add MediaRetentionPolicy to the EventCacheStore, take 2
This commit is contained in:
Ivan Enderlin
2025-01-28 17:27:02 +01:00
committed by GitHub
17 changed files with 3123 additions and 122 deletions

View File

@@ -6,19 +6,35 @@ All notable changes to this project will be documented in this file.
## [Unreleased] - ReleaseDate
### Breaking changes
- Replaced `Room::compute_display_name` with the reintroduced `Room::display_name()`. The new
method computes a display name, or return a cached value from the previous successful computation.
If you need a sync variant, consider using `Room::cached_display_name()`.
- [**breaking**]: The reexported types `SyncTimelineEvent` and `TimelineEvent` have been fused into a single type
`TimelineEvent`, and its field `push_actions` has been made `Option`al (it is set to `None` when
we couldn't compute the push actions, because we lacked some information).
([#4568](https://github.com/matrix-org/matrix-rust-sdk/pull/4568))
### Features
### Bug Fixes
- [**breaking**] `EventCacheStore` allows to control which media content is
allowed in the media cache, and how long it should be kept, with a
`MediaRetentionPolicy`:
- `EventCacheStore::add_media_content()` has an extra argument,
`ignore_policy`, which decides whether a media content should ignore the
`MediaRetentionPolicy`. It should be stored alongside the media content.
- `EventCacheStore` has four new methods: `media_retention_policy()`,
`set_media_retention_policy()`, `set_ignore_media_retention_policy()` and
`clean_up_media_cache()`.
- `EventCacheStore` implementations should delegate media cache methods to the
methods of the same name of `MediaService` to use the `MediaRetentionPolicy`.
They need to implement the `EventCacheStoreMedia` trait that can be tested
with the `event_cache_store_media_integration_tests!` macro.
([#4571](https://github.com/matrix-org/matrix-rust-sdk/pull/4571))
### Refactor
- [**breaking**] Replaced `Room::compute_display_name` with the reintroduced
`Room::display_name()`. The new method computes a display name, or return a
cached value from the previous successful computation. If you need a sync
variant, consider using `Room::cached_display_name()`.
([#4470](https://github.com/matrix-org/matrix-rust-sdk/pull/4470))
- [**breaking**]: The reexported types `SyncTimelineEvent` and `TimelineEvent`
have been fused into a single type `TimelineEvent`, and its field
`push_actions` has been made `Option`al (it is set to `None` when we couldn't
compute the push actions, because we lacked some information).
([#4568](https://github.com/matrix-org/matrix-rust-sdk/pull/4568))
## [0.9.0] - 2024-12-18

View File

@@ -32,7 +32,7 @@ use ruma::{
push::Action, room_id, uint, RoomId,
};
use super::DynEventCacheStore;
use super::{media::IgnoreMediaRetentionPolicy, DynEventCacheStore};
use crate::{
event_cache::{Event, Gap},
media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
@@ -168,7 +168,9 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
);
// Let's add the media.
self.add_media_content(&request_file, content.clone()).await.expect("adding media failed");
self.add_media_content(&request_file, content.clone(), IgnoreMediaRetentionPolicy::No)
.await
.expect("adding media failed");
// Media is present in the cache.
assert_eq!(
@@ -196,7 +198,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
);
// Let's add the media again.
self.add_media_content(&request_file, content.clone())
self.add_media_content(&request_file, content.clone(), IgnoreMediaRetentionPolicy::No)
.await
.expect("adding media again failed");
@@ -207,9 +209,13 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
);
// Let's add the thumbnail media.
self.add_media_content(&request_thumbnail, thumbnail_content.clone())
.await
.expect("adding thumbnail failed");
self.add_media_content(
&request_thumbnail,
thumbnail_content.clone(),
IgnoreMediaRetentionPolicy::No,
)
.await
.expect("adding thumbnail failed");
// Media's thumbnail is present.
assert_eq!(
@@ -225,9 +231,13 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
);
// Let's add another media with a different URI.
self.add_media_content(&request_other_file, other_content.clone())
.await
.expect("adding other media failed");
self.add_media_content(
&request_other_file,
other_content.clone(),
IgnoreMediaRetentionPolicy::No,
)
.await
.expect("adding other media failed");
// Other file is present.
assert_eq!(
@@ -279,7 +289,9 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
assert!(self.get_media_content(&req).await.unwrap().is_none(), "unexpected media found");
// Add the media.
self.add_media_content(&req, content.clone()).await.expect("adding media failed");
self.add_media_content(&req, content.clone(), IgnoreMediaRetentionPolicy::No)
.await
.expect("adding media failed");
// Sanity-check: media is found after adding it.
assert_eq!(self.get_media_content(&req).await.unwrap().unwrap(), b"hello");

View File

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,330 @@
// Copyright 2025 Kévin Commaille
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Configuration to decide whether or not to keep media in the cache, allowing
//! to do periodic cleanups to avoid to have the size of the media cache grow
//! indefinitely.
//!
//! To proceed to a cleanup, first set the [`MediaRetentionPolicy`] to use with
//! [`EventCacheStore::set_media_retention_policy()`]. Then call
//! [`EventCacheStore::clean_up_media_cache()`].
//!
//! In the future, other settings will allow to run automatic periodic cleanup
//! jobs.
//!
//! [`EventCacheStore::set_media_retention_policy()`]: crate::event_cache::store::EventCacheStore::set_media_retention_policy
//! [`EventCacheStore::clean_up_media_cache()`]: crate::event_cache::store::EventCacheStore::clean_up_media_cache
use ruma::time::{Duration, SystemTime};
use serde::{Deserialize, Serialize};
/// The retention policy for media content used by the [`EventCacheStore`].
///
/// [`EventCacheStore`]: crate::event_cache::store::EventCacheStore
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct MediaRetentionPolicy {
/// The maximum authorized size of the overall media cache, in bytes.
///
/// The cache size is defined as the sum of the sizes of all the (possibly
/// encrypted) media contents in the cache, excluding any metadata
/// associated with them.
///
/// If this is set and the cache size is bigger than this value, the oldest
/// media contents in the cache will be removed during a cleanup until the
/// cache size is below this threshold.
///
/// Note that it is possible for the cache size to temporarily exceed this
/// value between two cleanups.
///
/// Defaults to 400 MiB.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_cache_size: Option<usize>,
/// The maximum authorized size of a single media content, in bytes.
///
/// The size of a media content is the size taken by the content in the
/// database, after it was possibly encrypted, so it might differ from the
/// initial size of the content.
///
/// The maximum authorized size of a single media content is actually the
/// lowest value between `max_cache_size` and `max_file_size`.
///
/// If it is set, media content bigger than the maximum size will not be
/// cached. If the maximum size changed after media content that exceeds the
/// new value was cached, the corresponding content will be removed
/// during a cleanup.
///
/// Defaults to 20 MiB.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_file_size: Option<usize>,
/// The duration after which unaccessed media content is considered
/// expired.
///
/// If this is set, media content whose last access is older than this
/// duration will be removed from the media cache during a cleanup.
///
/// Defaults to 60 days.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_access_expiry: Option<Duration>,
}
impl MediaRetentionPolicy {
/// Create a [`MediaRetentionPolicy`] with the default values.
pub fn new() -> Self {
Self::default()
}
/// Create an empty [`MediaRetentionPolicy`].
///
/// This means that all media will be cached and cleanups have no effect.
pub fn empty() -> Self {
Self { max_cache_size: None, max_file_size: None, last_access_expiry: None }
}
/// Set the maximum authorized size of the overall media cache, in bytes.
pub fn with_max_cache_size(mut self, size: Option<usize>) -> Self {
self.max_cache_size = size;
self
}
/// Set the maximum authorized size of a single media content, in bytes.
pub fn with_max_file_size(mut self, size: Option<usize>) -> Self {
self.max_file_size = size;
self
}
/// Set the duration before which unaccessed media content is considered
/// expired.
pub fn with_last_access_expiry(mut self, duration: Option<Duration>) -> Self {
self.last_access_expiry = duration;
self
}
/// Whether this policy has limitations.
///
/// If this policy has no limitations, a cleanup job would have no effect.
///
/// Returns `true` if at least one limitation is set.
pub fn has_limitations(&self) -> bool {
self.max_cache_size.is_some()
|| self.max_file_size.is_some()
|| self.last_access_expiry.is_some()
}
/// Whether the given size exceeds the maximum authorized size of the media
/// cache.
///
/// # Arguments
///
/// * `size` - The overall size of the media cache to check, in bytes.
pub fn exceeds_max_cache_size(&self, size: usize) -> bool {
self.max_cache_size.is_some_and(|max_size| size > max_size)
}
/// The computed maximum authorized size of a single media content, in
/// bytes.
///
/// This is the lowest value between `max_cache_size` and `max_file_size`.
pub fn computed_max_file_size(&self) -> Option<usize> {
match (self.max_cache_size, self.max_file_size) {
(None, None) => None,
(None, Some(size)) => Some(size),
(Some(size), None) => Some(size),
(Some(max_cache_size), Some(max_file_size)) => Some(max_cache_size.min(max_file_size)),
}
}
/// Whether the given size, in bytes, exceeds the computed maximum
/// authorized size of a single media content.
///
/// # Arguments
///
/// * `size` - The size of the media content to check, in bytes.
pub fn exceeds_max_file_size(&self, size: usize) -> bool {
self.computed_max_file_size().is_some_and(|max_size| size > max_size)
}
/// Whether a content whose last access was at the given time has expired.
///
/// # Arguments
///
/// * `current_time` - The current time.
///
/// * `last_access_time` - The time when the media content to check was last
/// accessed.
pub fn has_content_expired(
&self,
current_time: SystemTime,
last_access_time: SystemTime,
) -> bool {
self.last_access_expiry.is_some_and(|max_duration| {
current_time
.duration_since(last_access_time)
// If this returns an error, the last access time is newer than the current time.
// This shouldn't happen but in this case the content cannot be expired.
.is_ok_and(|elapsed| elapsed >= max_duration)
})
}
}
impl Default for MediaRetentionPolicy {
fn default() -> Self {
Self {
// 400 MiB.
max_cache_size: Some(400 * 1024 * 1024),
// 20 MiB.
max_file_size: Some(20 * 1024 * 1024),
// 60 days.
last_access_expiry: Some(Duration::from_secs(60 * 24 * 60 * 60)),
}
}
}
#[cfg(test)]
mod tests {
use ruma::time::{Duration, SystemTime};
use super::MediaRetentionPolicy;
#[test]
fn test_media_retention_policy_has_limitations() {
let mut policy = MediaRetentionPolicy::empty();
assert!(!policy.has_limitations());
policy = policy.with_last_access_expiry(Some(Duration::from_secs(60)));
assert!(policy.has_limitations());
policy = policy.with_last_access_expiry(None);
assert!(!policy.has_limitations());
policy = policy.with_max_cache_size(Some(1_024));
assert!(policy.has_limitations());
policy = policy.with_max_cache_size(None);
assert!(!policy.has_limitations());
policy = policy.with_max_file_size(Some(1_024));
assert!(policy.has_limitations());
policy = policy.with_max_file_size(None);
assert!(!policy.has_limitations());
// With default values.
assert!(MediaRetentionPolicy::new().has_limitations());
}
#[test]
fn test_media_retention_policy_max_cache_size() {
let file_size = 2_048;
let mut policy = MediaRetentionPolicy::empty();
assert!(!policy.exceeds_max_cache_size(file_size));
assert_eq!(policy.computed_max_file_size(), None);
assert!(!policy.exceeds_max_file_size(file_size));
policy = policy.with_max_cache_size(Some(4_096));
assert!(!policy.exceeds_max_cache_size(file_size));
assert_eq!(policy.computed_max_file_size(), Some(4_096));
assert!(!policy.exceeds_max_file_size(file_size));
policy = policy.with_max_cache_size(Some(2_048));
assert!(!policy.exceeds_max_cache_size(file_size));
assert_eq!(policy.computed_max_file_size(), Some(2_048));
assert!(!policy.exceeds_max_file_size(file_size));
policy = policy.with_max_cache_size(Some(1_024));
assert!(policy.exceeds_max_cache_size(file_size));
assert_eq!(policy.computed_max_file_size(), Some(1_024));
assert!(policy.exceeds_max_file_size(file_size));
}
#[test]
fn test_media_retention_policy_max_file_size() {
let file_size = 2_048;
let mut policy = MediaRetentionPolicy::empty();
assert_eq!(policy.computed_max_file_size(), None);
assert!(!policy.exceeds_max_file_size(file_size));
// With max_file_size only.
policy = policy.with_max_file_size(Some(4_096));
assert_eq!(policy.computed_max_file_size(), Some(4_096));
assert!(!policy.exceeds_max_file_size(file_size));
policy = policy.with_max_file_size(Some(2_048));
assert_eq!(policy.computed_max_file_size(), Some(2_048));
assert!(!policy.exceeds_max_file_size(file_size));
policy = policy.with_max_file_size(Some(1_024));
assert_eq!(policy.computed_max_file_size(), Some(1_024));
assert!(policy.exceeds_max_file_size(file_size));
// With max_cache_size as well.
policy = policy.with_max_cache_size(Some(2_048));
assert_eq!(policy.computed_max_file_size(), Some(1_024));
assert!(policy.exceeds_max_file_size(file_size));
policy = policy.with_max_file_size(Some(2_048));
assert_eq!(policy.computed_max_file_size(), Some(2_048));
assert!(!policy.exceeds_max_file_size(file_size));
policy = policy.with_max_file_size(Some(4_096));
assert_eq!(policy.computed_max_file_size(), Some(2_048));
assert!(!policy.exceeds_max_file_size(file_size));
policy = policy.with_max_cache_size(Some(1_024));
assert_eq!(policy.computed_max_file_size(), Some(1_024));
assert!(policy.exceeds_max_file_size(file_size));
}
#[test]
fn test_media_retention_policy_has_content_expired() {
let epoch = SystemTime::UNIX_EPOCH;
let last_access_time = epoch + Duration::from_secs(30);
let epoch_plus_60 = epoch + Duration::from_secs(60);
let epoch_plus_120 = epoch + Duration::from_secs(120);
let mut policy = MediaRetentionPolicy::empty();
assert!(!policy.has_content_expired(epoch, last_access_time));
assert!(!policy.has_content_expired(last_access_time, last_access_time));
assert!(!policy.has_content_expired(epoch_plus_60, last_access_time));
assert!(!policy.has_content_expired(epoch_plus_120, last_access_time));
policy = policy.with_last_access_expiry(Some(Duration::from_secs(120)));
assert!(!policy.has_content_expired(epoch, last_access_time));
assert!(!policy.has_content_expired(last_access_time, last_access_time));
assert!(!policy.has_content_expired(epoch_plus_60, last_access_time));
assert!(!policy.has_content_expired(epoch_plus_120, last_access_time));
policy = policy.with_last_access_expiry(Some(Duration::from_secs(60)));
assert!(!policy.has_content_expired(epoch, last_access_time));
assert!(!policy.has_content_expired(last_access_time, last_access_time));
assert!(!policy.has_content_expired(epoch_plus_60, last_access_time));
assert!(policy.has_content_expired(epoch_plus_120, last_access_time));
policy = policy.with_last_access_expiry(Some(Duration::from_secs(30)));
assert!(!policy.has_content_expired(epoch, last_access_time));
assert!(!policy.has_content_expired(last_access_time, last_access_time));
assert!(policy.has_content_expired(epoch_plus_60, last_access_time));
assert!(policy.has_content_expired(epoch_plus_120, last_access_time));
policy = policy.with_last_access_expiry(Some(Duration::from_secs(0)));
assert!(!policy.has_content_expired(epoch, last_access_time));
assert!(policy.has_content_expired(last_access_time, last_access_time));
assert!(policy.has_content_expired(epoch_plus_60, last_access_time));
assert!(policy.has_content_expired(epoch_plus_120, last_access_time));
}
}

View File

@@ -0,0 +1,884 @@
// Copyright 2025 Kévin Commaille
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use async_trait::async_trait;
use matrix_sdk_common::{locks::Mutex, AsyncTraitDeps};
use ruma::{time::SystemTime, MxcUri};
use tokio::sync::Mutex as AsyncMutex;
use super::MediaRetentionPolicy;
use crate::{event_cache::store::EventCacheStoreError, media::MediaRequestParameters};
/// API for implementors of [`EventCacheStore`] to manage their media through
/// their implementation of [`EventCacheStoreMedia`].
///
/// [`EventCacheStore`]: crate::event_cache::store::EventCacheStore
#[derive(Debug)]
pub struct MediaService<Time: TimeProvider = DefaultTimeProvider> {
/// The time provider.
time_provider: Time,
/// The current [`MediaRetentionPolicy`].
policy: Mutex<MediaRetentionPolicy>,
/// A mutex to ensure a single cleanup is running at a time.
cleanup_guard: AsyncMutex<()>,
}
impl MediaService {
/// Construct a new default `MediaService`.
///
/// [`MediaService::restore()`] should be called after constructing the
/// `MediaService` to restore its previous state.
pub fn new() -> Self {
Self::default()
}
}
impl Default for MediaService {
fn default() -> Self {
Self::with_time_provider(DefaultTimeProvider)
}
}
impl<Time> MediaService<Time>
where
Time: TimeProvider,
{
/// Construct a new `MediaService` with the given `TimeProvider` and an
/// empty `MediaRetentionPolicy`.
fn with_time_provider(time_provider: Time) -> Self {
Self {
time_provider,
policy: Mutex::new(MediaRetentionPolicy::empty()),
cleanup_guard: AsyncMutex::new(()),
}
}
/// Restore the previous state of the [`MediaRetentionPolicy`] from data
/// that was persisted in the store.
///
/// This should be called immediately after constructing the `MediaService`.
///
/// # Arguments
///
/// * `policy` - The `MediaRetentionPolicy` that was persisted in the store.
pub fn restore(&self, policy: Option<MediaRetentionPolicy>) {
if let Some(policy) = policy {
*self.policy.lock() = policy;
}
}
/// Set the `MediaRetentionPolicy` of this service.
///
/// # Arguments
///
/// * `store` - The `EventCacheStoreMedia`.
///
/// * `policy` - The `MediaRetentionPolicy` to use.
pub async fn set_media_retention_policy<Store: EventCacheStoreMedia>(
&self,
store: &Store,
policy: MediaRetentionPolicy,
) -> Result<(), Store::Error> {
store.set_media_retention_policy_inner(policy).await?;
*self.policy.lock() = policy;
Ok(())
}
/// Get the `MediaRetentionPolicy` of this service.
pub fn media_retention_policy(&self) -> MediaRetentionPolicy {
*self.policy.lock()
}
/// Add a media file's content in the media store.
///
/// # Arguments
///
/// * `store` - The `EventCacheStoreMedia`.
///
/// * `request` - The `MediaRequestParameters` of the file.
///
/// * `content` - The content of the file.
///
/// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
/// ignored.
pub async fn add_media_content<Store: EventCacheStoreMedia>(
&self,
store: &Store,
request: &MediaRequestParameters,
content: Vec<u8>,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Store::Error> {
let policy = self.media_retention_policy();
if ignore_policy == IgnoreMediaRetentionPolicy::No
&& policy.exceeds_max_file_size(content.len())
{
// We do not cache the content.
return Ok(());
}
store
.add_media_content_inner(
request,
content,
self.time_provider.now(),
policy,
ignore_policy,
)
.await
}
/// Set whether the current [`MediaRetentionPolicy`] should be ignored for
/// the media.
///
/// The change will be taken into account in the next cleanup.
///
/// # Arguments
///
/// * `store` - The `EventCacheStoreMedia`.
///
/// * `request` - The `MediaRequestParameters` of the file.
///
/// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
/// ignored.
pub async fn set_ignore_media_retention_policy<Store: EventCacheStoreMedia>(
&self,
store: &Store,
request: &MediaRequestParameters,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Store::Error> {
store.set_ignore_media_retention_policy_inner(request, ignore_policy).await
}
/// Get a media file's content out of the media store.
///
/// # Arguments
///
/// * `store` - The `EventCacheStoreMedia`.
///
/// * `request` - The `MediaRequestParameters` of the file.
pub async fn get_media_content<Store: EventCacheStoreMedia>(
&self,
store: &Store,
request: &MediaRequestParameters,
) -> Result<Option<Vec<u8>>, Store::Error> {
store.get_media_content_inner(request, self.time_provider.now()).await
}
/// Get a media file's content associated to an `MxcUri` from the
/// media store.
///
/// # Arguments
///
/// * `store` - The `EventCacheStoreMedia`.
///
/// * `uri` - The `MxcUri` of the media file.
pub async fn get_media_content_for_uri<Store: EventCacheStoreMedia>(
&self,
store: &Store,
uri: &MxcUri,
) -> Result<Option<Vec<u8>>, Store::Error> {
store.get_media_content_for_uri_inner(uri, self.time_provider.now()).await
}
/// Clean up the media cache with the current `MediaRetentionPolicy`.
///
/// If there is already an ongoing cleanup, this is a noop.
///
/// # Arguments
///
/// * `store` - The `EventCacheStoreMedia`.
pub async fn clean_up_media_cache<Store: EventCacheStoreMedia>(
&self,
store: &Store,
) -> Result<(), Store::Error> {
let Ok(_guard) = self.cleanup_guard.try_lock() else {
// There is another ongoing cleanup.
return Ok(());
};
let policy = self.media_retention_policy();
if !policy.has_limitations() {
// No need to call the backend.
return Ok(());
}
store.clean_up_media_cache_inner(policy, self.time_provider.now()).await
}
}
/// An abstract trait that can be used to implement different store backends
/// for the media cache of the SDK.
///
/// The main purposes of this trait are to be able to centralize where we handle
/// [`MediaRetentionPolicy`] by wrapping this in a [`MediaService`], and to
/// simplify the implementation of tests by being able to have complete control
/// over the `SystemTime`s provided to the store.
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait EventCacheStoreMedia: AsyncTraitDeps {
/// The error type used by this media cache store.
type Error: fmt::Debug + Into<EventCacheStoreError>;
/// The persisted media retention policy in the media cache.
async fn media_retention_policy_inner(
&self,
) -> Result<Option<MediaRetentionPolicy>, Self::Error>;
/// Persist the media retention policy in the media cache.
///
/// # Arguments
///
/// * `policy` - The `MediaRetentionPolicy` to persist.
async fn set_media_retention_policy_inner(
&self,
policy: MediaRetentionPolicy,
) -> Result<(), Self::Error>;
/// Add a media file's content in the media cache.
///
/// # Arguments
///
/// * `request` - The `MediaRequestParameters` of the file.
///
/// * `content` - The content of the file.
///
/// * `current_time` - The current time, to set the last access time of the
/// media.
///
/// * `policy` - The media retention policy, to check whether the media is
/// too big to be cached.
///
/// * `ignore_policy` - Whether the `MediaRetentionPolicy` should be ignored
/// for this media. This setting should be persisted alongside the media
/// and taken into account whenever the policy is used.
async fn add_media_content_inner(
&self,
request: &MediaRequestParameters,
content: Vec<u8>,
current_time: SystemTime,
policy: MediaRetentionPolicy,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error>;
/// Set whether the current [`MediaRetentionPolicy`] should be ignored for
/// the media.
///
/// If the media of the given request is not found, this should be a noop.
///
/// The change will be taken into account in the next cleanup.
///
/// # Arguments
///
/// * `request` - The `MediaRequestParameters` of the file.
///
/// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
/// ignored.
async fn set_ignore_media_retention_policy_inner(
&self,
request: &MediaRequestParameters,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error>;
/// Get a media file's content out of the media cache.
///
/// # Arguments
///
/// * `request` - The `MediaRequestParameters` of the file.
///
/// * `current_time` - The current time, to update the last access time of
/// the media.
async fn get_media_content_inner(
&self,
request: &MediaRequestParameters,
current_time: SystemTime,
) -> Result<Option<Vec<u8>>, Self::Error>;
/// Get a media file's content associated to an `MxcUri` from the
/// media store.
///
/// # Arguments
///
/// * `uri` - The `MxcUri` of the media file.
///
/// * `current_time` - The current time, to update the last access time of
/// the media.
async fn get_media_content_for_uri_inner(
&self,
uri: &MxcUri,
current_time: SystemTime,
) -> Result<Option<Vec<u8>>, Self::Error>;
/// Clean up the media cache with the given policy.
///
/// For the integration tests, it is expected that content that does not
/// pass the last access expiry and max file size criteria will be
/// removed first. After that, the remaining cache size should be
/// computed to compare against the max cache size criteria.
///
/// # Arguments
///
/// * `policy` - The media retention policy to use for the cleanup. The
/// `cleanup_frequency` will be ignored.
///
/// * `current_time` - The current time, to be used to check for expired
/// content.
async fn clean_up_media_cache_inner(
&self,
policy: MediaRetentionPolicy,
current_time: SystemTime,
) -> Result<(), Self::Error>;
}
/// Whether the [`MediaRetentionPolicy`] should be ignored for the current
/// content.
///
/// Some media cache actions are noops when the media content that is processed
/// is filtered out by the policy. This can break some features of the SDK, like
/// the send queue, that expects to be able to persist all media files in the
/// store to restore them when the client is restored.
///
/// This can be converted to a boolean with
/// [`IgnoreMediaRetentionPolicy::is_yes()`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IgnoreMediaRetentionPolicy {
/// The media retention policy will be ignored and the current action will
/// not be a noop.
///
/// Any media content in this state must NOT be used when applying a
/// `MediaRetentionPolicy`. This applies to ANY criteria, like the maximum
/// file size, the maximum cache size or the last access expiry.
///
/// This state is supposed to be transient, and to only be used internally
/// by the SDK.
Yes,
/// The media retention policy will be respected and the current action
/// might be a noop.
No,
}
impl IgnoreMediaRetentionPolicy {
/// Whether this is an [`IgnoreMediaRetentionPolicy::Yes`] variant.
pub fn is_yes(self) -> bool {
matches!(self, Self::Yes)
}
}
/// An abstract trait to provide the current `SystemTime` for the
/// [`MediaService`].
pub trait TimeProvider {
/// The current time.
fn now(&self) -> SystemTime;
}
/// The default time provider, that calls `ruma::time::SystemTime::now()`.
#[derive(Debug)]
pub struct DefaultTimeProvider;
impl TimeProvider for DefaultTimeProvider {
fn now(&self) -> SystemTime {
SystemTime::now()
}
}
#[cfg(test)]
mod tests {
use std::{fmt, sync::MutexGuard};
use async_trait::async_trait;
use matrix_sdk_common::locks::Mutex;
use matrix_sdk_test::async_test;
use ruma::{
events::room::MediaSource,
mxc_uri,
time::{Duration, SystemTime},
MxcUri, OwnedMxcUri,
};
use super::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaService, TimeProvider};
use crate::{
event_cache::store::{media::MediaRetentionPolicy, EventCacheStoreError},
media::{MediaFormat, MediaRequestParameters, UniqueKey},
};
#[derive(Debug, Default)]
struct MockEventCacheStoreMedia {
inner: Mutex<MockEventCacheStoreMediaInner>,
}
impl MockEventCacheStoreMedia {
/// Whether the store was accessed.
fn accessed(&self) -> bool {
self.inner.lock().accessed
}
/// Reset the `accessed` boolean.
fn reset_accessed(&self) {
self.inner.lock().accessed = false;
}
/// Access the inner store.
///
/// Should be called for every access to the inner store as it also sets
/// the `accessed` boolean.
fn inner(&self) -> MutexGuard<'_, MockEventCacheStoreMediaInner> {
let mut inner = self.inner.lock();
inner.accessed = true;
inner
}
}
#[derive(Debug, Default)]
struct MockEventCacheStoreMediaInner {
/// Whether this store was accessed.
///
/// Must be set to `true` for any operation that unlocks the store.
accessed: bool,
/// The persisted media retention policy.
media_retention_policy: Option<MediaRetentionPolicy>,
/// The list of media content.
media_list: Vec<MediaContent>,
/// The time of the last cleanup.
cleanup_time: Option<SystemTime>,
}
#[derive(Debug, Clone)]
struct MediaContent {
/// The unique key for the media content.
key: String,
/// The original URI of the media content.
uri: OwnedMxcUri,
/// The media content.
content: Vec<u8>,
/// Whether the `MediaRetentionPolicy` should be ignored for this media
/// content;
ignore_policy: bool,
/// The time of the last access of the media content.
last_access: SystemTime,
}
#[derive(Debug)]
struct MockEventCacheStoreMediaError;
impl fmt::Display for MockEventCacheStoreMediaError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MockEventCacheStoreMediaError")
}
}
impl std::error::Error for MockEventCacheStoreMediaError {}
impl From<MockEventCacheStoreMediaError> for EventCacheStoreError {
fn from(value: MockEventCacheStoreMediaError) -> Self {
Self::backend(value)
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl EventCacheStoreMedia for MockEventCacheStoreMedia {
type Error = MockEventCacheStoreMediaError;
async fn media_retention_policy_inner(
&self,
) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
Ok(self.inner().media_retention_policy)
}
async fn set_media_retention_policy_inner(
&self,
policy: MediaRetentionPolicy,
) -> Result<(), Self::Error> {
self.inner().media_retention_policy = Some(policy);
Ok(())
}
async fn add_media_content_inner(
&self,
request: &MediaRequestParameters,
content: Vec<u8>,
current_time: SystemTime,
policy: MediaRetentionPolicy,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
let ignore_policy = ignore_policy.is_yes();
if !ignore_policy && policy.exceeds_max_file_size(content.len()) {
return Ok(());
}
let mut inner = self.inner();
let key = request.unique_key();
if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
let media_content = &mut inner.media_list[pos];
media_content.content = content;
media_content.last_access = current_time;
media_content.ignore_policy = ignore_policy;
} else {
inner.media_list.push(MediaContent {
key,
uri: request.uri().to_owned(),
content,
ignore_policy,
last_access: current_time,
});
}
Ok(())
}
async fn set_ignore_media_retention_policy_inner(
&self,
request: &MediaRequestParameters,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
let key = request.unique_key();
let mut inner = self.inner();
if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
inner.media_list[pos].ignore_policy = ignore_policy.is_yes();
}
Ok(())
}
async fn get_media_content_inner(
&self,
request: &MediaRequestParameters,
current_time: SystemTime,
) -> Result<Option<Vec<u8>>, Self::Error> {
let key = request.unique_key();
let mut inner = self.inner();
let Some(media_content) =
inner.media_list.iter_mut().find(|content| content.key == key)
else {
return Ok(None);
};
media_content.last_access = current_time;
Ok(Some(media_content.content.clone()))
}
async fn get_media_content_for_uri_inner(
&self,
uri: &MxcUri,
current_time: SystemTime,
) -> Result<Option<Vec<u8>>, Self::Error> {
let mut inner = self.inner();
let Some(media_content) =
inner.media_list.iter_mut().find(|content| content.uri == uri)
else {
return Ok(None);
};
media_content.last_access = current_time;
Ok(Some(media_content.content.clone()))
}
async fn clean_up_media_cache_inner(
&self,
_policy: MediaRetentionPolicy,
current_time: SystemTime,
) -> Result<(), Self::Error> {
// This is mostly a noop. We don't care about this test implementation, only
// whether this method was called with the right time.
self.inner().cleanup_time = Some(current_time);
Ok(())
}
}
#[derive(Debug)]
struct MockTimeProvider {
now: Mutex<SystemTime>,
}
impl MockTimeProvider {
/// Construct a `MockTimeProvider` with the given current time.
fn new(now: SystemTime) -> Self {
Self { now: Mutex::new(now) }
}
/// Set the current time.
fn set_now(&self, now: SystemTime) {
*self.now.lock() = now;
}
}
impl TimeProvider for MockTimeProvider {
fn now(&self) -> SystemTime {
*self.now.lock()
}
}
#[async_test]
async fn test_media_service_empty_policy() {
let content = b"some text content";
let uri = mxc_uri!("mxc://server.local/AbcDe1234");
let request = MediaRequestParameters {
source: MediaSource::Plain(uri.to_owned()),
format: MediaFormat::File,
};
let now = SystemTime::UNIX_EPOCH;
let store = MockEventCacheStoreMedia::default();
let service = MediaService::with_time_provider(MockTimeProvider::new(now));
// By default an empty policy is used.
assert!(!service.media_retention_policy().has_limitations());
service.restore(None);
assert!(!service.media_retention_policy().has_limitations());
assert!(!store.accessed());
// Add media.
service
.add_media_content(&store, &request, content.to_vec(), IgnoreMediaRetentionPolicy::No)
.await
.unwrap();
assert!(store.accessed());
let media_content = store.inner().media_list[0].clone();
assert_eq!(media_content.uri, uri);
assert_eq!(media_content.content, content);
assert!(!media_content.ignore_policy);
assert_eq!(media_content.last_access, now);
let now = now + Duration::from_secs(60);
service.time_provider.set_now(now);
store.reset_accessed();
// Get media from request.
let loaded_content = service.get_media_content(&store, &request).await.unwrap();
assert!(store.accessed());
assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
// The last access time was updated.
let media = store.inner().media_list[0].clone();
assert_eq!(media.last_access, now);
let now = now + Duration::from_secs(60);
service.time_provider.set_now(now);
store.reset_accessed();
// Get media from URI.
let loaded_content = service.get_media_content_for_uri(&store, uri).await.unwrap();
assert!(store.accessed());
assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
// The last access time was updated.
let media = store.inner().media_list[0].clone();
assert_eq!(media.last_access, now);
// Update ignore_policy.
service
.set_ignore_media_retention_policy(&store, &request, IgnoreMediaRetentionPolicy::Yes)
.await
.unwrap();
assert!(store.accessed());
let media_content = store.inner().media_list[0].clone();
assert!(media_content.ignore_policy);
// Try a cleanup. With the empty policy the store should not be accessed.
assert_eq!(store.inner().cleanup_time, None);
store.reset_accessed();
service.clean_up_media_cache(&store).await.unwrap();
assert!(!store.accessed());
assert_eq!(store.inner().cleanup_time, None);
}
#[async_test]
async fn test_media_service_non_empty_policy() {
// Content of less than 32 bytes.
let small_content = b"some text content";
let small_uri = mxc_uri!("mxc://server.local/small");
let small_request = MediaRequestParameters {
source: MediaSource::Plain(small_uri.to_owned()),
format: MediaFormat::File,
};
// Content of more than 32 bytes.
let big_content = b"some much much larger text content";
let big_uri = mxc_uri!("mxc://server.local/big");
let big_request = MediaRequestParameters {
source: MediaSource::Plain(big_uri.to_owned()),
format: MediaFormat::File,
};
// Limit the file size to 32 bytes in the retention policy.
let policy = MediaRetentionPolicy { max_file_size: Some(32), ..Default::default() };
let now = SystemTime::UNIX_EPOCH;
let store = MockEventCacheStoreMedia::default();
let service = MediaService::with_time_provider(MockTimeProvider::new(now));
// Check that restoring the policy works.
service.restore(Some(MediaRetentionPolicy::default()));
assert_eq!(service.media_retention_policy(), MediaRetentionPolicy::default());
assert!(!store.accessed());
// Set the media retention policy.
service.set_media_retention_policy(&store, policy).await.unwrap();
assert!(store.accessed());
assert_eq!(service.media_retention_policy(), policy);
assert_eq!(store.inner().media_retention_policy, Some(policy));
store.reset_accessed();
// Add small media, it should work because its size is lower than the max file
// size.
service
.add_media_content(
&store,
&small_request,
small_content.to_vec(),
IgnoreMediaRetentionPolicy::No,
)
.await
.unwrap();
assert!(store.accessed());
let media_content = store.inner().media_list[0].clone();
assert_eq!(media_content.uri, small_uri);
assert_eq!(media_content.content, small_content);
assert!(!media_content.ignore_policy);
assert_eq!(media_content.last_access, now);
let now = now + Duration::from_secs(60);
service.time_provider.set_now(now);
store.reset_accessed();
// Get media from request.
let loaded_content = service.get_media_content(&store, &small_request).await.unwrap();
assert!(store.accessed());
assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
// The last access time was updated.
let media = store.inner().media_list[0].clone();
assert_eq!(media.last_access, now);
let now = now + Duration::from_secs(60);
service.time_provider.set_now(now);
store.reset_accessed();
// Get media from URI.
let loaded_content = service.get_media_content_for_uri(&store, small_uri).await.unwrap();
assert!(store.accessed());
assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
// The last access time was updated.
let media = store.inner().media_list[0].clone();
assert_eq!(media.last_access, now);
let now = now + Duration::from_secs(60);
service.time_provider.set_now(now);
store.reset_accessed();
// Add big media, it will not work because it is bigger than the max file size.
service
.add_media_content(
&store,
&big_request,
big_content.to_vec(),
IgnoreMediaRetentionPolicy::No,
)
.await
.unwrap();
assert!(!store.accessed());
assert_eq!(store.inner().media_list.len(), 1);
store.reset_accessed();
let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
assert!(store.accessed());
assert_eq!(loaded_content, None);
store.reset_accessed();
let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
assert!(store.accessed());
assert_eq!(loaded_content, None);
// Add big media, but this time ignore the policy.
service
.add_media_content(
&store,
&big_request,
big_content.to_vec(),
IgnoreMediaRetentionPolicy::Yes,
)
.await
.unwrap();
assert!(store.accessed());
assert_eq!(store.inner().media_list.len(), 2);
store.reset_accessed();
// Get media from request.
let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
assert!(store.accessed());
assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
// The last access time was updated.
let media = store.inner().media_list[1].clone();
assert_eq!(media.last_access, now);
let now = now + Duration::from_secs(60);
service.time_provider.set_now(now);
store.reset_accessed();
// Get media from URI.
let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
assert!(store.accessed());
assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
// The last access time was updated.
let media = store.inner().media_list[1].clone();
assert_eq!(media.last_access, now);
// Try a cleanup, the store should be accessed.
assert_eq!(store.inner().cleanup_time, None);
let now = now + Duration::from_secs(60);
service.time_provider.set_now(now);
store.reset_accessed();
service.clean_up_media_cache(&store).await.unwrap();
assert!(store.accessed());
assert_eq!(store.inner().cleanup_time, Some(now));
}
}

View File

@@ -0,0 +1,28 @@
// Copyright 2025 Kévin Commaille
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Types and traits regarding media caching of the event cache store.
mod media_retention_policy;
mod media_service;
#[cfg(any(test, feature = "testing"))]
#[macro_use]
pub mod integration_tests;
#[cfg(any(test, feature = "testing"))]
pub use self::integration_tests::EventCacheStoreMediaIntegrationTests;
pub use self::{
media_retention_policy::MediaRetentionPolicy,
media_service::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaService},
};

View File

@@ -20,9 +20,15 @@ use matrix_sdk_common::{
ring_buffer::RingBuffer,
store_locks::memory_store_helper::try_take_leased_lock,
};
use ruma::{time::Instant, MxcUri, OwnedMxcUri, RoomId};
use ruma::{
time::{Instant, SystemTime},
MxcUri, OwnedMxcUri, RoomId,
};
use super::{EventCacheStore, EventCacheStoreError, Result};
use super::{
media::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService},
EventCacheStore, EventCacheStoreError, Result,
};
use crate::{
event_cache::{Event, Gap},
media::{MediaRequestParameters, UniqueKey as _},
@@ -35,13 +41,34 @@ use crate::{
#[derive(Debug)]
pub struct MemoryStore {
inner: StdRwLock<MemoryStoreInner>,
media_service: MediaService,
}
#[derive(Debug)]
struct MemoryStoreInner {
media: RingBuffer<(OwnedMxcUri, String /* unique key */, Vec<u8>)>,
media: RingBuffer<MediaContent>,
leases: HashMap<String, (String, Instant)>,
events: RelationalLinkedChunk<Event, Gap>,
media_retention_policy: Option<MediaRetentionPolicy>,
}
/// A media content in the `MemoryStore`.
#[derive(Debug)]
struct MediaContent {
/// The URI of the content.
uri: OwnedMxcUri,
/// The unique key of the content.
key: String,
/// The bytes of the content.
data: Vec<u8>,
/// Whether we should ignore the [`MediaRetentionPolicy`] for this content.
ignore_policy: bool,
/// The time of the last access of the content.
last_access: SystemTime,
}
// SAFETY: `new_unchecked` is safe because 20 is not zero.
@@ -54,7 +81,10 @@ impl Default for MemoryStore {
media: RingBuffer::new(NUMBER_OF_MEDIAS),
leases: Default::default(),
events: RelationalLinkedChunk::new(),
media_retention_policy: None,
}),
// No need to call `restore()` since nothing is persisted.
media_service: MediaService::new(),
}
}
}
@@ -113,15 +143,9 @@ impl EventCacheStore for MemoryStore {
&self,
request: &MediaRequestParameters,
data: Vec<u8>,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<()> {
// Avoid duplication. Let's try to remove it first.
self.remove_media_content(request).await?;
// Now, let's add it.
let mut inner = self.inner.write().unwrap();
inner.media.push((request.uri().to_owned(), request.unique_key(), data));
Ok(())
self.media_service.add_media_content(self, request, data, ignore_policy).await
}
async fn replace_media_key(
@@ -133,23 +157,18 @@ impl EventCacheStore for MemoryStore {
let mut inner = self.inner.write().unwrap();
if let Some((mxc, key, _)) = inner.media.iter_mut().find(|(_, key, _)| *key == expected_key)
if let Some(media_content) =
inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
{
*mxc = to.uri().to_owned();
*key = to.unique_key();
media_content.uri = to.uri().to_owned();
media_content.key = to.unique_key();
}
Ok(())
}
async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
let expected_key = request.unique_key();
let inner = self.inner.read().unwrap();
Ok(inner.media.iter().find_map(|(_media_uri, media_key, media_content)| {
(media_key == &expected_key).then(|| media_content.to_owned())
}))
self.media_service.get_media_content(self, request).await
}
async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
@@ -157,10 +176,8 @@ impl EventCacheStore for MemoryStore {
let mut inner = self.inner.write().unwrap();
let Some(index) = inner
.media
.iter()
.position(|(_media_uri, media_key, _media_content)| media_key == &expected_key)
let Some(index) =
inner.media.iter().position(|media_content| media_content.key == expected_key)
else {
return Ok(());
};
@@ -174,24 +191,17 @@ impl EventCacheStore for MemoryStore {
&self,
uri: &MxcUri,
) -> Result<Option<Vec<u8>>, Self::Error> {
let inner = self.inner.read().unwrap();
Ok(inner.media.iter().find_map(|(media_uri, _media_key, media_content)| {
(media_uri == uri).then(|| media_content.to_owned())
}))
self.media_service.get_media_content_for_uri(self, uri).await
}
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
let mut inner = self.inner.write().unwrap();
let expected_key = uri.to_owned();
let positions = inner
.media
.iter()
.enumerate()
.filter_map(|(position, (media_uri, _media_key, _media_content))| {
(media_uri == &expected_key).then_some(position)
})
.filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
.collect::<Vec<_>>();
// Iterate in reverse-order so that positions stay valid after first removals.
@@ -201,16 +211,240 @@ impl EventCacheStore for MemoryStore {
Ok(())
}
async fn set_media_retention_policy(
&self,
policy: MediaRetentionPolicy,
) -> Result<(), Self::Error> {
self.media_service.set_media_retention_policy(self, policy).await
}
fn media_retention_policy(&self) -> MediaRetentionPolicy {
self.media_service.media_retention_policy()
}
async fn set_ignore_media_retention_policy(
&self,
request: &MediaRequestParameters,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
}
async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
self.media_service.clean_up_media_cache(self).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl EventCacheStoreMedia for MemoryStore {
type Error = EventCacheStoreError;
async fn media_retention_policy_inner(
&self,
) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
Ok(self.inner.read().unwrap().media_retention_policy)
}
async fn set_media_retention_policy_inner(
&self,
policy: MediaRetentionPolicy,
) -> Result<(), Self::Error> {
self.inner.write().unwrap().media_retention_policy = Some(policy);
Ok(())
}
async fn add_media_content_inner(
&self,
request: &MediaRequestParameters,
data: Vec<u8>,
last_access: SystemTime,
policy: MediaRetentionPolicy,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
// Avoid duplication. Let's try to remove it first.
self.remove_media_content(request).await?;
let ignore_policy = ignore_policy.is_yes();
if !ignore_policy && policy.exceeds_max_file_size(data.len()) {
// Do not store it.
return Ok(());
};
// Now, let's add it.
let mut inner = self.inner.write().unwrap();
inner.media.push(MediaContent {
uri: request.uri().to_owned(),
key: request.unique_key(),
data,
ignore_policy,
last_access,
});
Ok(())
}
async fn set_ignore_media_retention_policy_inner(
&self,
request: &MediaRequestParameters,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
let mut inner = self.inner.write().unwrap();
let expected_key = request.unique_key();
if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
{
media_content.ignore_policy = ignore_policy.is_yes();
}
Ok(())
}
async fn get_media_content_inner(
&self,
request: &MediaRequestParameters,
current_time: SystemTime,
) -> Result<Option<Vec<u8>>, Self::Error> {
let mut inner = self.inner.write().unwrap();
let expected_key = request.unique_key();
// First get the content out of the buffer, we are going to put it back at the
// end.
let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
return Ok(None);
};
let Some(mut content) = inner.media.remove(index) else {
return Ok(None);
};
// Clone the data.
let data = content.data.clone();
// Update the last access time.
content.last_access = current_time;
// Put it back in the buffer.
inner.media.push(content);
Ok(Some(data))
}
async fn get_media_content_for_uri_inner(
&self,
expected_uri: &MxcUri,
current_time: SystemTime,
) -> Result<Option<Vec<u8>>, Self::Error> {
let mut inner = self.inner.write().unwrap();
// First get the content out of the buffer, we are going to put it back at the
// end.
let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
return Ok(None);
};
let Some(mut content) = inner.media.remove(index) else {
return Ok(None);
};
// Clone the data.
let data = content.data.clone();
// Update the last access time.
content.last_access = current_time;
// Put it back in the buffer.
inner.media.push(content);
Ok(Some(data))
}
async fn clean_up_media_cache_inner(
&self,
policy: MediaRetentionPolicy,
current_time: SystemTime,
) -> Result<(), Self::Error> {
if !policy.has_limitations() {
// We can safely skip all the checks.
return Ok(());
}
let mut inner = self.inner.write().unwrap();
// First, check media content that exceed the max filesize.
if policy.computed_max_file_size().is_some() {
inner.media.retain(|content| {
content.ignore_policy || !policy.exceeds_max_file_size(content.data.len())
});
}
// Then, clean up expired media content.
if policy.last_access_expiry.is_some() {
inner.media.retain(|content| {
content.ignore_policy
|| !policy.has_content_expired(current_time, content.last_access)
});
}
// Finally, if the cache size is too big, remove old items until it fits.
if let Some(max_cache_size) = policy.max_cache_size {
// Reverse the iterator because in case the cache size is overflowing, we want
// to count the number of old items to remove. Items are sorted by last access
// and old items are at the start.
let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
(0usize, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
|(mut cache_size, mut items_to_remove), (index, content)| {
if content.ignore_policy {
// Do not count it.
return (cache_size, items_to_remove);
}
let remove_item = if items_to_remove.is_empty() {
// We have not reached the max cache size yet.
if let Some(sum) = cache_size.checked_add(content.data.len()) {
cache_size = sum;
// Start removing items if we have exceeded the max cache size.
cache_size > max_cache_size
} else {
// The cache size is overflowing, remove the remaining items, since the
// max cache size cannot be bigger than
// usize::MAX.
true
}
} else {
// We have reached the max cache size already, just remove it.
true
};
if remove_item {
items_to_remove.push(index);
}
(cache_size, items_to_remove)
},
);
// The indexes are already in reverse order so we can just iterate in that order
// to remove them starting by the end.
for index in items_to_remove {
inner.media.remove(index);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{EventCacheStore, MemoryStore, Result};
use super::{MemoryStore, Result};
use crate::event_cache_store_media_integration_tests;
async fn get_event_cache_store() -> Result<impl EventCacheStore> {
async fn get_event_cache_store() -> Result<MemoryStore> {
Ok(MemoryStore::new())
}
event_cache_store_integration_tests!();
event_cache_store_integration_tests_time!();
event_cache_store_media_integration_tests!(with_media_size_tests);
}

View File

@@ -24,6 +24,7 @@ use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
#[cfg(any(test, feature = "testing"))]
#[macro_use]
pub mod integration_tests;
pub mod media;
mod memory_store;
mod traits;

View File

@@ -21,7 +21,10 @@ use matrix_sdk_common::{
};
use ruma::{MxcUri, RoomId};
use super::EventCacheStoreError;
use super::{
media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy},
EventCacheStoreError,
};
use crate::{
event_cache::{Event, Gap},
media::MediaRequestParameters,
@@ -88,6 +91,7 @@ pub trait EventCacheStore: AsyncTraitDeps {
&self,
request: &MediaRequestParameters,
content: Vec<u8>,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error>;
/// Replaces the given media's content key with another one.
@@ -162,6 +166,42 @@ pub trait EventCacheStore: AsyncTraitDeps {
///
/// * `uri` - The `MxcUri` of the media files.
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error>;
/// Set the `MediaRetentionPolicy` to use for deciding whether to store or
/// keep media content.
///
/// # Arguments
///
/// * `policy` - The `MediaRetentionPolicy` to use.
async fn set_media_retention_policy(
&self,
policy: MediaRetentionPolicy,
) -> Result<(), Self::Error>;
/// Get the current `MediaRetentionPolicy`.
fn media_retention_policy(&self) -> MediaRetentionPolicy;
/// Set whether the current [`MediaRetentionPolicy`] should be ignored for
/// the media.
///
/// The change will be taken into account in the next cleanup.
///
/// # Arguments
///
/// * `request` - The `MediaRequestParameters` of the file.
///
/// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
/// ignored.
async fn set_ignore_media_retention_policy(
&self,
request: &MediaRequestParameters,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error>;
/// Clean up the media cache with the current `MediaRetentionPolicy`.
///
/// If there is already an ongoing cleanup, this is a noop.
async fn clean_up_media_cache(&self) -> Result<(), Self::Error>;
}
#[repr(transparent)]
@@ -211,8 +251,9 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
&self,
request: &MediaRequestParameters,
content: Vec<u8>,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
self.0.add_media_content(request, content).await.map_err(Into::into)
self.0.add_media_content(request, content, ignore_policy).await.map_err(Into::into)
}
async fn replace_media_key(
@@ -247,6 +288,29 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
self.0.remove_media_content_for_uri(uri).await.map_err(Into::into)
}
async fn set_media_retention_policy(
&self,
policy: MediaRetentionPolicy,
) -> Result<(), Self::Error> {
self.0.set_media_retention_policy(policy).await.map_err(Into::into)
}
fn media_retention_policy(&self) -> MediaRetentionPolicy {
self.0.media_retention_policy()
}
async fn set_ignore_media_retention_policy(
&self,
request: &MediaRequestParameters,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
self.0.set_ignore_media_retention_policy(request, ignore_policy).await.map_err(Into::into)
}
async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
self.0.clean_up_media_cache().await.map_err(Into::into)
}
}
/// A type-erased [`EventCacheStore`].

View File

@@ -6,6 +6,13 @@ All notable changes to this project will be documented in this file.
## [Unreleased] - ReleaseDate
### Features
- [**breaking**] `SqliteEventCacheStore` implements the new APIs of
`EventCacheStore` for `MediaRetentionPolicy`. See the changelog of
`matrix-sdk-base` for more details.
([#4571](https://github.com/matrix-org/matrix-rust-sdk/pull/4571))
## [0.9.0] - 2024-12-18
### Features

View File

@@ -0,0 +1,3 @@
-- Add an ignore_policy column, defaulting to FALSE for all media content.
ALTER TABLE "media"
ADD COLUMN "ignore_policy" BOOLEAN NOT NULL DEFAULT FALSE;

View File

@@ -19,23 +19,40 @@ use std::{borrow::Cow, fmt, path::Path, sync::Arc};
use async_trait::async_trait;
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
use matrix_sdk_base::{
event_cache::{store::EventCacheStore, Event, Gap},
event_cache::{
store::{
media::{
EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
MediaService,
},
EventCacheStore,
},
Event, Gap,
},
linked_chunk::{ChunkContent, ChunkIdentifier, RawChunk, Update},
media::{MediaRequestParameters, UniqueKey},
};
use matrix_sdk_store_encryption::StoreCipher;
use ruma::{MilliSecondsSinceUnixEpoch, RoomId};
use rusqlite::{OptionalExtension, Transaction, TransactionBehavior};
use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri, RoomId};
use rusqlite::{params_from_iter, OptionalExtension, Transaction, TransactionBehavior};
use tokio::fs;
#[cfg(not(test))]
use tracing::warn;
use tracing::{debug, trace};
use crate::{
error::{Error, Result},
utils::{Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt},
utils::{
repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
SqliteKeyValueStoreConnExt, SqliteTransactionExt,
},
OpenStoreError,
};
mod keys {
// Entries in Key-value store
pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
// Tables
pub const LINKED_CHUNKS: &str = "linked_chunks";
pub const MEDIA: &str = "media";
@@ -46,7 +63,7 @@ mod keys {
/// This is used to figure whether the SQLite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`run_migrations`] function.
const DATABASE_VERSION: u8 = 3;
const DATABASE_VERSION: u8 = 4;
/// The string used to identify a chunk of type events, in the `type` field in
/// the database.
@@ -60,6 +77,7 @@ const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
pub struct SqliteEventCacheStore {
store_cipher: Option<Arc<StoreCipher>>,
pool: SqlitePool,
media_service: Arc<MediaService>,
}
#[cfg(not(tarpaulin_include))]
@@ -96,7 +114,11 @@ impl SqliteEventCacheStore {
None => None,
};
Ok(Self { store_cipher, pool })
let media_service = MediaService::new();
let media_retention_policy = media_retention_policy(&conn).await?;
media_service.restore(media_retention_policy);
Ok(Self { store_cipher, pool, media_service: Arc::new(media_service) })
}
fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
@@ -303,6 +325,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
.await?;
}
if version < 4 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!(
"../migrations/event_cache_store/004_ignore_policy.sql"
))?;
txn.set_db_version(4)
})
.await?;
}
Ok(())
}
@@ -591,19 +623,9 @@ impl EventCacheStore for SqliteEventCacheStore {
&self,
request: &MediaRequestParameters,
content: Vec<u8>,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<()> {
let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
let data = self.encode_value(content)?;
let conn = self.acquire().await?;
conn.execute(
"INSERT OR REPLACE INTO media (uri, format, data, last_access) VALUES (?, ?, ?, CAST(strftime('%s') as INT))",
(uri, format, data),
)
.await?;
Ok(())
self.media_service.add_media_content(self, request, content, ignore_policy).await
}
async fn replace_media_key(
@@ -619,8 +641,7 @@ impl EventCacheStore for SqliteEventCacheStore {
let conn = self.acquire().await?;
conn.execute(
r#"UPDATE media SET uri = ?, format = ?, last_access = CAST(strftime('%s') as INT)
WHERE uri = ? AND format = ?"#,
r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
(new_uri, new_format, prev_uri, prev_format),
)
.await?;
@@ -629,31 +650,7 @@ impl EventCacheStore for SqliteEventCacheStore {
}
async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
let conn = self.acquire().await?;
let data = conn
.with_transaction::<_, rusqlite::Error, _>(move |txn| {
// Update the last access.
// We need to do this first so the transaction is in write mode right away.
// See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
txn.execute(
"UPDATE media SET last_access = CAST(strftime('%s') as INT) \
WHERE uri = ? AND format = ?",
(&uri, &format),
)?;
txn.query_row::<Vec<u8>, _, _>(
"SELECT data FROM media WHERE uri = ? AND format = ?",
(&uri, &format),
|row| row.get(0),
)
.optional()
})
.await?;
data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
self.media_service.get_media_content(self, request).await
}
async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
@@ -668,9 +665,125 @@ impl EventCacheStore for SqliteEventCacheStore {
async fn get_media_content_for_uri(
&self,
uri: &ruma::MxcUri,
uri: &MxcUri,
) -> Result<Option<Vec<u8>>, Self::Error> {
self.media_service.get_media_content_for_uri(self, uri).await
}
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
let uri = self.encode_key(keys::MEDIA, uri);
let conn = self.acquire().await?;
conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
Ok(())
}
async fn set_media_retention_policy(
&self,
policy: MediaRetentionPolicy,
) -> Result<(), Self::Error> {
self.media_service.set_media_retention_policy(self, policy).await
}
fn media_retention_policy(&self) -> MediaRetentionPolicy {
self.media_service.media_retention_policy()
}
async fn set_ignore_media_retention_policy(
&self,
request: &MediaRequestParameters,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
}
async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
self.media_service.clean_up_media_cache(self).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl EventCacheStoreMedia for SqliteEventCacheStore {
type Error = Error;
async fn media_retention_policy_inner(
&self,
) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
let conn = self.acquire().await?;
media_retention_policy(&conn).await
}
async fn set_media_retention_policy_inner(
&self,
policy: MediaRetentionPolicy,
) -> Result<(), Self::Error> {
let conn = self.acquire().await?;
let serialized_policy = rmp_serde::to_vec_named(&policy)?;
conn.set_kv(keys::MEDIA_RETENTION_POLICY, serialized_policy).await?;
Ok(())
}
async fn add_media_content_inner(
&self,
request: &MediaRequestParameters,
data: Vec<u8>,
last_access: SystemTime,
policy: MediaRetentionPolicy,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
let ignore_policy = ignore_policy.is_yes();
let data = self.encode_value(data)?;
if !ignore_policy && policy.exceeds_max_file_size(data.len()) {
return Ok(());
}
let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
let timestamp = time_to_timestamp(last_access);
let conn = self.acquire().await?;
conn.execute(
"INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
(uri, format, data, timestamp, ignore_policy),
)
.await?;
Ok(())
}
async fn set_ignore_media_retention_policy_inner(
&self,
request: &MediaRequestParameters,
ignore_policy: IgnoreMediaRetentionPolicy,
) -> Result<(), Self::Error> {
let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
let ignore_policy = ignore_policy.is_yes();
let conn = self.acquire().await?;
conn.execute(
r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
(ignore_policy, uri, format),
)
.await?;
Ok(())
}
async fn get_media_content_inner(
&self,
request: &MediaRequestParameters,
current_time: SystemTime,
) -> Result<Option<Vec<u8>>, Self::Error> {
let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
let format = self.encode_key(keys::MEDIA, request.format.unique_key());
let timestamp = time_to_timestamp(current_time);
let conn = self.acquire().await?;
let data = conn
.with_transaction::<_, rusqlite::Error, _>(move |txn| {
@@ -678,11 +791,38 @@ impl EventCacheStore for SqliteEventCacheStore {
// We need to do this first so the transaction is in write mode right away.
// See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
txn.execute(
"UPDATE media SET last_access = CAST(strftime('%s') as INT) \
WHERE uri = ?",
(&uri,),
"UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
(timestamp, &uri, &format),
)?;
txn.query_row::<Vec<u8>, _, _>(
"SELECT data FROM media WHERE uri = ? AND format = ?",
(&uri, &format),
|row| row.get(0),
)
.optional()
})
.await?;
data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
}
async fn get_media_content_for_uri_inner(
&self,
uri: &MxcUri,
current_time: SystemTime,
) -> Result<Option<Vec<u8>>, Self::Error> {
let uri = self.encode_key(keys::MEDIA, uri);
let timestamp = time_to_timestamp(current_time);
let conn = self.acquire().await?;
let data = conn
.with_transaction::<_, rusqlite::Error, _>(move |txn| {
// Update the last access.
// We need to do this first so the transaction is in write mode right away.
// See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
txn.query_row::<Vec<u8>, _, _>(
"SELECT data FROM media WHERE uri = ?",
(&uri,),
@@ -695,11 +835,140 @@ impl EventCacheStore for SqliteEventCacheStore {
data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
}
async fn remove_media_content_for_uri(&self, uri: &ruma::MxcUri) -> Result<()> {
let uri = self.encode_key(keys::MEDIA, uri);
async fn clean_up_media_cache_inner(
&self,
policy: MediaRetentionPolicy,
current_time: SystemTime,
) -> Result<(), Self::Error> {
if !policy.has_limitations() {
// We can safely skip all the checks.
return Ok(());
}
let conn = self.acquire().await?;
conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
let removed = conn
.with_transaction::<_, Error, _>(move |txn| {
let mut removed = false;
// First, check media content that exceed the max filesize.
if let Some(max_file_size) = policy.computed_max_file_size() {
let count = txn.execute(
"DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
(max_file_size,),
)?;
if count > 0 {
removed = true;
}
}
// Then, clean up expired media content.
if let Some(last_access_expiry) = policy.last_access_expiry {
let current_timestamp = time_to_timestamp(current_time);
let expiry_secs = last_access_expiry.as_secs();
let count = txn.execute(
"DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
(current_timestamp, expiry_secs),
)?;
if count > 0 {
removed = true;
}
}
// Finally, if the cache size is too big, remove old items until it fits.
if let Some(max_cache_size) = policy.max_cache_size {
// i64 is the integer type used by SQLite, use it here to avoid usize overflow
// during the conversion of the result.
let cache_size_int = txn
.query_row(
"SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
(),
|row| {
// `sum()` returns `NULL` if there are no rows.
row.get::<_, Option<i64>>(0)
},
)?
.unwrap_or_default();
let cache_size_usize = usize::try_from(cache_size_int);
// If the cache size is overflowing or bigger than max cache size, clean up.
if cache_size_usize.is_err()
|| cache_size_usize.is_ok_and(|cache_size| cache_size > max_cache_size)
{
// Get the sizes of the media contents ordered by last access.
let mut cached_stmt = txn.prepare_cached(
"SELECT rowid, length(data) FROM media \
WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
)?;
let content_sizes = cached_stmt
.query(())?
.mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, usize>(1)?)));
let mut accumulated_items_size = 0usize;
let mut limit_reached = false;
let mut rows_to_remove = Vec::new();
for result in content_sizes {
let (row_id, size) = match result {
Ok(content_size) => content_size,
Err(error) => {
return Err(error.into());
}
};
if limit_reached {
rows_to_remove.push(row_id);
continue;
}
match accumulated_items_size.checked_add(size) {
Some(acc) if acc > max_cache_size => {
// We can stop accumulating.
limit_reached = true;
rows_to_remove.push(row_id);
}
Some(acc) => accumulated_items_size = acc,
None => {
// The accumulated size is overflowing but the setting cannot be
// bigger than usize::MAX, we can stop accumulating.
limit_reached = true;
rows_to_remove.push(row_id);
}
};
}
if !rows_to_remove.is_empty() {
removed = true;
}
txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
let sql_params = repeat_vars(row_ids.len());
let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
Ok(Vec::<()>::new())
})?;
}
}
Ok(removed)
})
.await?;
// If we removed media, use the VACUUM command to defragment the
// database and free space on the filesystem.
if removed {
if let Err(error) = conn.execute("VACUUM", ()).await {
// Since this is an optimisation step, do not propagate the error
// but log it.
#[cfg(not(test))]
warn!("Failed to vacuum database: {error}");
// We want to know if there is an error with this step during tests.
#[cfg(test)]
return Err(error.into());
}
}
Ok(())
}
@@ -786,6 +1055,17 @@ fn insert_chunk(
Ok(())
}
/// Get the persisted [`MediaRetentionPolicy`] with the given connection.
async fn media_retention_policy(
conn: &SqliteAsyncConn,
) -> Result<Option<MediaRetentionPolicy>, Error> {
let Some(bytes) = conn.get_kv(keys::MEDIA_RETENTION_POLICY).await? else {
return Ok(None);
};
Ok(Some(rmp_serde::from_slice(&bytes)?))
}
#[cfg(test)]
mod tests {
use std::{
@@ -798,11 +1078,13 @@ mod tests {
event_cache::{
store::{
integration_tests::{check_test_event, make_test_event},
media::IgnoreMediaRetentionPolicy,
EventCacheStore, EventCacheStoreError,
},
Gap,
},
event_cache_store_integration_tests, event_cache_store_integration_tests_time,
event_cache_store_media_integration_tests,
linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
};
@@ -828,6 +1110,7 @@ mod tests {
event_cache_store_integration_tests!();
event_cache_store_integration_tests_time!();
event_cache_store_media_integration_tests!(with_media_size_tests);
async fn get_event_cache_store_content_sorted_by_last_access(
event_cache_store: &SqliteEventCacheStore,
@@ -863,7 +1146,7 @@ mod tests {
// Add the media.
event_cache_store
.add_media_content(&file_request, content.clone())
.add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
.await
.expect("adding file failed");
@@ -872,7 +1155,11 @@ mod tests {
tokio::time::sleep(Duration::from_secs(3)).await;
event_cache_store
.add_media_content(&thumbnail_request, thumbnail_content.clone())
.add_media_content(
&thumbnail_request,
thumbnail_content.clone(),
IgnoreMediaRetentionPolicy::No,
)
.await
.expect("adding thumbnail failed");
@@ -1476,7 +1763,7 @@ mod encrypted_tests {
use matrix_sdk_base::{
event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
event_cache_store_integration_tests_time,
event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
};
use once_cell::sync::Lazy;
use tempfile::{tempdir, TempDir};
@@ -1502,4 +1789,5 @@ mod encrypted_tests {
event_cache_store_integration_tests!();
event_cache_store_integration_tests_time!();
event_cache_store_media_integration_tests!();
}

View File

@@ -19,6 +19,7 @@ use async_trait::async_trait;
use deadpool_sqlite::Object as SqliteAsyncConn;
use itertools::Itertools;
use matrix_sdk_store_encryption::StoreCipher;
use ruma::time::SystemTime;
use rusqlite::{limits::Limit, OptionalExtension, Params, Row, Statement, Transaction};
use crate::{
@@ -187,7 +188,7 @@ impl SqliteAsyncConnExt for SqliteAsyncConn {
}
pub(crate) trait SqliteTransactionExt {
fn chunk_large_query_over<Query, Res>(
fn chunk_large_query_over<Key, Query, Res>(
&self,
keys_to_chunk: Vec<Key>,
result_capacity: Option<usize>,
@@ -199,7 +200,7 @@ pub(crate) trait SqliteTransactionExt {
}
impl SqliteTransactionExt for Transaction<'_> {
fn chunk_large_query_over<Query, Res>(
fn chunk_large_query_over<Key, Query, Res>(
&self,
mut keys_to_chunk: Vec<Key>,
result_capacity: Option<usize>,
@@ -380,8 +381,23 @@ pub(crate) fn repeat_vars(count: usize) -> impl fmt::Display {
iter::repeat("?").take(count).format(",")
}
/// Convert the given `SystemTime` to a timestamp, as the number of seconds
/// since Unix Epoch.
///
/// Returns an `i64` as it is the numeric type used by SQLite.
pub(crate) fn time_to_timestamp(time: SystemTime) -> i64 {
time.duration_since(SystemTime::UNIX_EPOCH)
.ok()
.and_then(|d| d.as_secs().try_into().ok())
// It is unlikely to happen unless the time on the system is seriously wrong, but we always
// need a value.
.unwrap_or(0)
}
#[cfg(test)]
mod unit_tests {
use std::time::Duration;
use super::*;
#[test]
@@ -396,4 +412,13 @@ mod unit_tests {
fn generating_zero_vars_panics() {
repeat_vars(0);
}
#[test]
fn test_time_to_timestamp() {
assert_eq!(time_to_timestamp(SystemTime::UNIX_EPOCH), 0);
assert_eq!(time_to_timestamp(SystemTime::UNIX_EPOCH + Duration::from_secs(60)), 60);
// Fallback value on overflow.
assert_eq!(time_to_timestamp(SystemTime::UNIX_EPOCH - Duration::from_secs(60)), 0);
}
}

View File

@@ -26,6 +26,11 @@ All notable changes to this project will be documented in this file.
- Enable HTTP/2 support in the HTTP client.
([#4566](https://github.com/matrix-org/matrix-rust-sdk/pull/4566))
- The media contents stored in the media cache can now be controlled with a
`MediaRetentionPolicy` and the new `Media` methods `media_retention_policy()`,
`set_media_retention_policy()`, `clean_up_media_cache()`.
([#4571](https://github.com/matrix-org/matrix-rust-sdk/pull/4571))
### Refactor
- [**breaking**]: The reexported types `SyncTimelineEvent` and `TimelineEvent` have been fused into a single type

View File

@@ -23,7 +23,8 @@ use std::{fmt, fs::File, path::Path};
use eyeball::SharedObservable;
use futures_util::future::try_join;
pub use matrix_sdk_base::media::*;
use matrix_sdk_base::event_cache::store::media::IgnoreMediaRetentionPolicy;
pub use matrix_sdk_base::{event_cache::store::media::MediaRetentionPolicy, media::*};
use mime::Mime;
use ruma::{
api::{
@@ -518,7 +519,7 @@ impl Media {
.event_cache_store()
.lock()
.await?
.add_media_content(request, content.clone())
.add_media_content(request, content.clone(), IgnoreMediaRetentionPolicy::No)
.await?;
}
@@ -671,6 +672,44 @@ impl Media {
Ok(())
}
/// Set the [`MediaRetentionPolicy`] to use for deciding whether to store or
/// keep media content.
///
/// It is used:
///
/// * When a media needs to be cached, to check that it does not exceed the
/// max file size.
///
/// * When [`Media::clean_up_media_cache()`], to check that all media
/// content in the store fits those criteria.
///
/// To apply the new policy to the media cache right away,
/// [`Media::clean_up_media_cache()`] should be called after this.
///
/// By default, an empty `MediaRetentionPolicy` is used, which means that no
/// criteria are applied.
///
/// # Arguments
///
/// * `policy` - The `MediaRetentionPolicy` to use.
pub async fn set_media_retention_policy(&self, policy: MediaRetentionPolicy) -> Result<()> {
self.client.event_cache_store().lock().await?.set_media_retention_policy(policy).await?;
Ok(())
}
/// Get the current `MediaRetentionPolicy`.
pub async fn media_retention_policy(&self) -> Result<MediaRetentionPolicy> {
Ok(self.client.event_cache_store().lock().await?.media_retention_policy())
}
/// Clean up the media cache with the current [`MediaRetentionPolicy`].
///
/// If there is already an ongoing cleanup, this is a noop.
pub async fn clean_up_media_cache(&self) -> Result<()> {
self.client.event_cache_store().lock().await?.clean_up_media_cache().await?;
Ok(())
}
/// Upload the file bytes in `data` and return the source information.
pub(crate) async fn upload_plain_media_and_thumbnail(
&self,

View File

@@ -40,6 +40,7 @@ use matrix_sdk_base::{
deserialized_responses::{
RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
},
event_cache::store::media::IgnoreMediaRetentionPolicy,
media::MediaThumbnailSettings,
store::StateStoreExt,
ComposerDraft, RoomInfoNotableUpdateReasons, RoomMemberships, StateChanges, StateStoreDataKey,
@@ -2038,7 +2039,10 @@ impl Room {
let request =
MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
if let Err(err) = cache_store_lock_guard.add_media_content(&request, data).await {
if let Err(err) = cache_store_lock_guard
.add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
.await
{
warn!("unable to cache the media after uploading it: {err}");
}
@@ -2052,7 +2056,10 @@ impl Room {
format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
};
if let Err(err) = cache_store_lock_guard.add_media_content(&request, data).await {
if let Err(err) = cache_store_lock_guard
.add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
.await
{
warn!("unable to cache the media after uploading it: {err}");
}
}

View File

@@ -15,6 +15,7 @@
//! Private implementations of the media upload mechanism.
use matrix_sdk_base::{
event_cache::store::media::IgnoreMediaRetentionPolicy,
media::{MediaFormat, MediaRequestParameters},
store::{
ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
@@ -137,7 +138,12 @@ impl RoomSendQueue {
// Cache the file itself in the cache store.
cache_store
.add_media_content(&file_media_request, data.clone())
.add_media_content(
&file_media_request,
data.clone(),
// Make sure that the file is stored until it has been uploaded.
IgnoreMediaRetentionPolicy::Yes,
)
.await
.map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
@@ -153,7 +159,12 @@ impl RoomSendQueue {
// Cache thumbnail in the cache store.
let thumbnail_media_request = Media::make_local_file_media_request(&txn);
cache_store
.add_media_content(&thumbnail_media_request, data)
.add_media_content(
&thumbnail_media_request,
data,
// Make sure that the thumbnail is stored until it has been uploaded.
IgnoreMediaRetentionPolicy::Yes,
)
.await
.map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
@@ -256,6 +267,12 @@ impl QueueStorage {
.await
.map_err(RoomSendQueueStorageError::LockError)?;
// The media can now be removed during cleanups.
cache_store
.set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
.await
.map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
cache_store
.replace_media_key(
&from_req,
@@ -281,6 +298,12 @@ impl QueueStorage {
trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
// The media can now be removed during cleanups.
cache_store
.set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
.await
.map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
cache_store
.replace_media_key(
&from_req,