feat(base): Add automatic media cache cleanups to MediaService

Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
This commit is contained in:
Kévin Commaille
2025-02-02 12:20:13 +01:00
parent af9a5edd59
commit 5aaa6bf187
3 changed files with 222 additions and 20 deletions

View File

@@ -15,9 +15,14 @@
use std::{fmt, sync::Arc};
use async_trait::async_trait;
use matrix_sdk_common::{locks::Mutex, AsyncTraitDeps};
use matrix_sdk_common::{
executor::{spawn, JoinHandle},
locks::Mutex,
AsyncTraitDeps, SendOutsideWasm, SyncOutsideWasm,
};
use ruma::{time::SystemTime, MxcUri};
use tokio::sync::Mutex as AsyncMutex;
use tracing::error;
use super::MediaRetentionPolicy;
use crate::{event_cache::store::EventCacheStoreError, media::MediaRequestParameters};
@@ -41,6 +46,15 @@ struct MediaServiceInner<Time: TimeProvider = DefaultTimeProvider> {
/// A mutex to ensure a single cleanup is running at a time.
cleanup_guard: AsyncMutex<()>,
/// The time of the last media cache cleanup.
last_media_cleanup_time: Mutex<Option<SystemTime>>,
/// The [`JoinHandle`] for an automatic media cleanup task.
///
/// Used to ensure that only one automatic cleanup is running at a time, and
/// to stop the cleanup when the [`MediaServiceInner`] is dropped.
automatic_media_cleanup_join_handle: Mutex<Option<JoinHandle<()>>>,
}
impl MediaService {
@@ -61,7 +75,7 @@ impl Default for MediaService {
impl<Time> MediaService<Time>
where
Time: TimeProvider,
Time: TimeProvider + 'static,
{
/// Construct a new `MediaService` with the given `TimeProvider` and an
/// empty `MediaRetentionPolicy`.
@@ -70,6 +84,8 @@ where
time_provider,
policy: Mutex::new(MediaRetentionPolicy::empty()),
cleanup_guard: AsyncMutex::new(()),
last_media_cleanup_time: Mutex::new(None),
automatic_media_cleanup_join_handle: Mutex::new(None),
};
Self { inner: Arc::new(inner) }
@@ -83,10 +99,18 @@ where
/// # Arguments
///
/// * `policy` - The `MediaRetentionPolicy` that was persisted in the store.
pub fn restore(&self, policy: Option<MediaRetentionPolicy>) {
pub fn restore(
&self,
policy: Option<MediaRetentionPolicy>,
last_media_cleanup_time: Option<SystemTime>,
) {
if let Some(policy) = policy {
*self.inner.policy.lock() = policy;
}
if let Some(time) = last_media_cleanup_time {
*self.inner.last_media_cleanup_time.lock() = Some(time);
}
}
/// Get the current time from the inner [`TimeProvider`].
@@ -101,7 +125,7 @@ where
/// * `store` - The `EventCacheStoreMedia`.
///
/// * `policy` - The `MediaRetentionPolicy` to use.
pub async fn set_media_retention_policy<Store: EventCacheStoreMedia>(
pub async fn set_media_retention_policy<Store: EventCacheStoreMedia + 'static>(
&self,
store: &Store,
policy: MediaRetentionPolicy,
@@ -110,6 +134,8 @@ where
*self.inner.policy.lock() = policy;
self.maybe_spawn_automatic_media_cache_cleanup(store, self.now());
Ok(())
}
@@ -130,7 +156,7 @@ where
///
/// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
/// ignored.
pub async fn add_media_content<Store: EventCacheStoreMedia>(
pub async fn add_media_content<Store: EventCacheStoreMedia + 'static>(
&self,
store: &Store,
request: &MediaRequestParameters,
@@ -146,7 +172,14 @@ where
return Ok(());
}
store.add_media_content_inner(request, content, self.now(), policy, ignore_policy).await
let current_time = self.now();
store
.add_media_content_inner(request, content, current_time, policy, ignore_policy)
.await?;
self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
Ok(())
}
/// Set whether the current [`MediaRetentionPolicy`] should be ignored for
@@ -178,12 +211,17 @@ where
/// * `store` - The `EventCacheStoreMedia`.
///
/// * `request` - The `MediaRequestParameters` of the file.
pub async fn get_media_content<Store: EventCacheStoreMedia>(
pub async fn get_media_content<Store: EventCacheStoreMedia + 'static>(
&self,
store: &Store,
request: &MediaRequestParameters,
) -> Result<Option<Vec<u8>>, Store::Error> {
store.get_media_content_inner(request, self.now()).await
let current_time = self.now();
let content = store.get_media_content_inner(request, current_time).await?;
self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
Ok(content)
}
/// Get a media file's content associated to an `MxcUri` from the
@@ -194,12 +232,17 @@ where
/// * `store` - The `EventCacheStoreMedia`.
///
/// * `uri` - The `MxcUri` of the media file.
pub async fn get_media_content_for_uri<Store: EventCacheStoreMedia>(
pub async fn get_media_content_for_uri<Store: EventCacheStoreMedia + 'static>(
&self,
store: &Store,
uri: &MxcUri,
) -> Result<Option<Vec<u8>>, Store::Error> {
store.get_media_content_for_uri_inner(uri, self.now()).await
let current_time = self.now();
let content = store.get_media_content_for_uri_inner(uri, current_time).await?;
self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
Ok(content)
}
/// Clean up the media cache with the current `MediaRetentionPolicy`.
@@ -212,6 +255,14 @@ where
pub async fn clean_up_media_cache<Store: EventCacheStoreMedia>(
&self,
store: &Store,
) -> Result<(), Store::Error> {
self.clean_up_media_cache_inner(store, self.now()).await
}
async fn clean_up_media_cache_inner<Store: EventCacheStoreMedia>(
&self,
store: &Store,
current_time: SystemTime,
) -> Result<(), Store::Error> {
let Ok(_guard) = self.inner.cleanup_guard.try_lock() else {
// There is another ongoing cleanup.
@@ -225,7 +276,56 @@ where
return Ok(());
}
store.clean_up_media_cache_inner(policy, self.now()).await
store.clean_up_media_cache_inner(policy, current_time).await?;
*self.inner.last_media_cleanup_time.lock() = Some(current_time);
Ok(())
}
/// Spawn an automatic media cache cleanup, according to the media retention
/// policy.
///
/// A cleanup will be spawned if:
/// * The media retention policy's `cleanup_frequency` is set and enough
/// time has passed since the last cleanup.
/// * No other cleanup is running,
fn maybe_spawn_automatic_media_cache_cleanup<Store: EventCacheStoreMedia + 'static>(
&self,
store: &Store,
current_time: SystemTime,
) {
let mut join_handle = self.inner.automatic_media_cleanup_join_handle.lock();
if join_handle.as_ref().is_some_and(|join_handle| !join_handle.is_finished()) {
// There is an ongoing automatic media cache cleanup.
return;
}
let policy = self.media_retention_policy();
if policy.cleanup_frequency.is_none() || !policy.has_limitations() {
// Automatic cleanups are disabled or have no effect.
return;
}
let last_media_cleanup_time = *self.inner.last_media_cleanup_time.lock();
if last_media_cleanup_time.is_some_and(|last_cleanup_time| {
!policy.should_clean_up(current_time, last_cleanup_time)
}) {
// It is not time to clean up.
return;
}
let this = self.clone();
let store = store.clone();
let handle = spawn(async move {
if let Err(error) = this.clean_up_media_cache_inner(&store, current_time).await {
error!("Failed to run automatic media cache cleanup: {error}");
}
});
*join_handle = Some(handle);
}
}
@@ -238,6 +338,17 @@ where
}
}
impl<Time> Drop for MediaServiceInner<Time>
where
Time: TimeProvider,
{
fn drop(&mut self) {
if let Some(join_handle) = self.automatic_media_cleanup_join_handle.lock().take() {
join_handle.abort();
}
}
}
/// An abstract trait that can be used to implement different store backends
/// for the media cache of the SDK.
///
@@ -249,7 +360,7 @@ where
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait EventCacheStoreMedia: AsyncTraitDeps + Clone {
/// The error type used by this media cache store.
type Error: fmt::Debug + Into<EventCacheStoreError>;
type Error: fmt::Debug + fmt::Display + Into<EventCacheStoreError>;
/// The persisted media retention policy in the media cache.
async fn media_retention_policy_inner(
@@ -401,7 +512,7 @@ impl IgnoreMediaRetentionPolicy {
/// An abstract trait to provide the current `SystemTime` for the
/// [`MediaService`].
pub trait TimeProvider {
pub trait TimeProvider: SendOutsideWasm + SyncOutsideWasm {
/// The current time.
fn now(&self) -> SystemTime;
}
@@ -681,7 +792,7 @@ mod tests {
// By default an empty policy is used.
assert!(!service.media_retention_policy().has_limitations());
service.restore(None);
service.restore(None, None);
assert!(!service.media_retention_policy().has_limitations());
assert!(!store.accessed());
@@ -770,7 +881,7 @@ mod tests {
let service = MediaService::with_time_provider(MockTimeProvider::new(now));
// Check that restoring the policy works.
service.restore(Some(MediaRetentionPolicy::default()));
service.restore(Some(MediaRetentionPolicy::default()), None);
assert_eq!(service.media_retention_policy(), MediaRetentionPolicy::default());
assert!(!store.accessed());
@@ -904,4 +1015,91 @@ mod tests {
assert!(store.accessed());
assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
}
#[async_test]
async fn test_media_service_automatic_cleanup() {
// 64 bytes content.
let content = vec![0; 64];
let uri_1 = mxc_uri!("mxc://localhost/media-1");
let request_1 = MediaRequestParameters {
source: MediaSource::Plain(uri_1.to_owned()),
format: MediaFormat::File,
};
let uri_2 = mxc_uri!("mxc://localhost/media-2");
let request_2 = MediaRequestParameters {
source: MediaSource::Plain(uri_2.to_owned()),
format: MediaFormat::File,
};
let now = SystemTime::UNIX_EPOCH;
let store = MockEventCacheStoreMedia::default();
let service = MediaService::with_time_provider(MockTimeProvider::new(now));
// Set an empty policy.
let policy = MediaRetentionPolicy::empty();
service.set_media_retention_policy(&store, policy).await.unwrap();
// Add the contents.
service
.add_media_content(&store, &request_1, content.clone(), IgnoreMediaRetentionPolicy::No)
.await
.unwrap();
service
.add_media_content(&store, &request_2, content, IgnoreMediaRetentionPolicy::No)
.await
.unwrap();
assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
// Try to launch an automatic cleanup.
let now = now + Duration::from_secs(60);
service.inner.time_provider.set_now(now);
service.maybe_spawn_automatic_media_cache_cleanup(&store, now);
// No cleanup was spawned since automatic cleanups are disabled.
assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
// Set a policy with automatic cleanup every hour.
let policy = MediaRetentionPolicy::empty()
.with_cleanup_frequency(Some(Duration::from_secs(60 * 60)));
let now = now + Duration::from_secs(60);
service.inner.time_provider.set_now(now);
service.set_media_retention_policy(&store, policy).await.unwrap();
// No cleanup was spawned since the policy has no limitations.
assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
// Set a policy with automatic cleanup every hour and a max file size.
let policy = MediaRetentionPolicy::empty()
.with_cleanup_frequency(Some(Duration::from_secs(60 * 60)))
.with_max_file_size(Some(512));
let now = now + Duration::from_secs(60);
service.inner.time_provider.set_now(now);
service.set_media_retention_policy(&store, policy).await.unwrap();
// A cleanup was spawned since there was no last_media_cleanup_time.
let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
join_handle.await.unwrap();
assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
// Try again one minute in the future, nothing is spawned because we need to
// wait for one hour.
let now = now + Duration::from_secs(60);
service.inner.time_provider.set_now(now);
service.get_media_content(&store, &request_1).await.unwrap();
assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
// Try again 2 hours in the future, another cleanup is spawned.
let now = now + Duration::from_secs(2 * 60 * 60);
service.inner.time_provider.set_now(now);
service.get_media_content_for_uri(&store, uri_1).await.unwrap();
let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
join_handle.await.unwrap();
assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
}
}

View File

@@ -80,17 +80,20 @@ const NUMBER_OF_MEDIAS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20)
impl Default for MemoryStore {
fn default() -> Self {
// Given that the store is empty, we won't need to clean it up right away.
let last_media_cleanup_time = SystemTime::now();
let media_service = MediaService::new();
media_service.restore(None, Some(last_media_cleanup_time));
Self {
inner: Arc::new(StdRwLock::new(MemoryStoreInner {
media: RingBuffer::new(NUMBER_OF_MEDIAS),
leases: Default::default(),
events: RelationalLinkedChunk::new(),
media_retention_policy: None,
// Given that the store is empty, we won't need to clean it up right away.
last_media_cleanup_time: SystemTime::now(),
last_media_cleanup_time,
})),
// No need to call `restore()` since nothing is persisted.
media_service: MediaService::new(),
media_service,
}
}
}

View File

@@ -117,7 +117,8 @@ impl SqliteEventCacheStore {
let media_service = MediaService::new();
let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
media_service.restore(media_retention_policy);
let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
media_service.restore(media_retention_policy, last_media_cleanup_time);
Ok(Self { store_cipher, pool, media_service })
}