fix(base): Move all fields of MemoryStore inside a StdRwLock<_>.

This patch creates a new `MemoryStoreInner` and moves all fields from
`MemoryStore` into this new type. All locks are removed, but a new lock
is added around `MemoryStoreInner`. That way we have a single lock.
This commit is contained in:
Ivan Enderlin
2024-11-25 15:53:28 +01:00
parent db9ee9d87b
commit faa8aa2b9c
3 changed files with 42 additions and 25 deletions

View File

@@ -34,9 +34,14 @@ use crate::{
#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub struct MemoryStore {
media: StdRwLock<RingBuffer<(OwnedMxcUri, String /* unique key */, Vec<u8>)>>,
leases: StdRwLock<HashMap<String, (String, Instant)>>,
events: StdRwLock<RelationalLinkedChunk<Event, Gap>>,
inner: StdRwLock<MemoryStoreInner>,
}
#[derive(Debug)]
struct MemoryStoreInner {
media: RingBuffer<(OwnedMxcUri, String /* unique key */, Vec<u8>)>,
leases: HashMap<String, (String, Instant)>,
events: RelationalLinkedChunk<Event, Gap>,
}
// SAFETY: `new_unchecked` is safe because 20 is not zero.
@@ -45,9 +50,11 @@ const NUMBER_OF_MEDIAS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20)
impl Default for MemoryStore {
fn default() -> Self {
Self {
media: StdRwLock::new(RingBuffer::new(NUMBER_OF_MEDIAS)),
leases: Default::default(),
events: StdRwLock::new(RelationalLinkedChunk::new()),
inner: StdRwLock::new(MemoryStoreInner {
media: RingBuffer::new(NUMBER_OF_MEDIAS),
leases: Default::default(),
events: RelationalLinkedChunk::new(),
}),
}
}
}
@@ -70,7 +77,9 @@ impl EventCacheStore for MemoryStore {
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder))
let mut inner = self.inner.write().unwrap();
Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
}
async fn handle_linked_chunk_updates(
@@ -78,9 +87,9 @@ impl EventCacheStore for MemoryStore {
room_id: &RoomId,
updates: &[Update<Event, Gap>],
) -> Result<(), Self::Error> {
self.events.write().unwrap().apply_updates(room_id, updates);
let mut inner = self.inner.write().unwrap();
Ok(())
Ok(inner.events.apply_updates(updates))
}
async fn add_media_content(
@@ -90,8 +99,10 @@ impl EventCacheStore for MemoryStore {
) -> Result<()> {
// Avoid duplication. Let's try to remove it first.
self.remove_media_content(request).await?;
// Now, let's add it.
self.media.write().unwrap().push((request.uri().to_owned(), request.unique_key(), data));
let mut inner = self.inner.write().unwrap();
inner.media.push((request.uri().to_owned(), request.unique_key(), data));
Ok(())
}
@@ -103,8 +114,10 @@ impl EventCacheStore for MemoryStore {
) -> Result<(), Self::Error> {
let expected_key = from.unique_key();
let mut medias = self.media.write().unwrap();
if let Some((mxc, key, _)) = medias.iter_mut().find(|(_, key, _)| *key == expected_key) {
let mut inner = self.inner.write().unwrap();
if let Some((mxc, key, _)) = inner.media.iter_mut().find(|(_, key, _)| *key == expected_key)
{
*mxc = to.uri().to_owned();
*key = to.unique_key();
}
@@ -115,8 +128,9 @@ impl EventCacheStore for MemoryStore {
async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
let expected_key = request.unique_key();
let media = self.media.read().unwrap();
Ok(media.iter().find_map(|(_media_uri, media_key, media_content)| {
let inner = self.inner.write().unwrap();
Ok(inner.media.iter().find_map(|(_media_uri, media_key, media_content)| {
(media_key == &expected_key).then(|| media_content.to_owned())
}))
}
@@ -124,23 +138,27 @@ impl EventCacheStore for MemoryStore {
async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
let expected_key = request.unique_key();
let mut media = self.media.write().unwrap();
let Some(index) = media
let mut inner = self.inner.write().unwrap();
let Some(index) = inner
.media
.iter()
.position(|(_media_uri, media_key, _media_content)| media_key == &expected_key)
else {
return Ok(());
};
media.remove(index);
inner.media.remove(index);
Ok(())
}
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
let mut media = self.media.write().unwrap();
let mut inner = self.inner.write().unwrap();
let expected_key = uri.to_owned();
let positions = media
let positions = inner
.media
.iter()
.enumerate()
.filter_map(|(position, (media_uri, _media_key, _media_content))| {
@@ -150,7 +168,7 @@ impl EventCacheStore for MemoryStore {
// Iterate in reverse-order so that positions stay valid after first removals.
for position in positions.into_iter().rev() {
media.remove(position);
inner.media.remove(position);
}
Ok(())

View File

@@ -361,7 +361,7 @@ mod tests {
impl TestStore {
fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool {
try_take_leased_lock(&self.leases, lease_duration_ms, key, holder)
try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
}
}
@@ -502,12 +502,11 @@ mod tests {
pub mod memory_store_helper {
use std::{
collections::{hash_map::Entry, HashMap},
sync::RwLock,
time::{Duration, Instant},
};
pub fn try_take_leased_lock(
leases: &RwLock<HashMap<String, (String, Instant)>>,
leases: &mut HashMap<String, (String, Instant)>,
lease_duration_ms: u32,
key: &str,
holder: &str,
@@ -515,7 +514,7 @@ pub mod memory_store_helper {
let now = Instant::now();
let expiration = now + Duration::from_millis(lease_duration_ms.into());
match leases.write().unwrap().entry(key.to_owned()) {
match leases.entry(key.to_owned()) {
// There is an existing holder.
Entry::Occupied(mut entry) => {
let (current_holder, current_expiration) = entry.get_mut();

View File

@@ -632,7 +632,7 @@ impl CryptoStore for MemoryStore {
key: &str,
holder: &str,
) -> Result<bool> {
Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder))
Ok(try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder))
}
}