From 9a7f18c62c7cb20bb6edeaff489fcd795a950581 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 Jul 2024 16:32:17 +0200 Subject: [PATCH] state store: add dependent queued events tables and operations --- .../src/store/integration_tests.rs | 94 ++++++++++- .../matrix-sdk-base/src/store/memory_store.rs | 79 ++++++++- crates/matrix-sdk-base/src/store/mod.rs | 6 +- crates/matrix-sdk-base/src/store/traits.rs | 120 ++++++++++++++ .../src/state_store/migrations.rs | 15 +- .../src/state_store/mod.rs | 153 +++++++++++++++++- .../005_send_queue_dependent_events.sql | 14 ++ crates/matrix-sdk-sqlite/src/state_store.rs | 120 +++++++++++++- 8 files changed, 583 insertions(+), 18 deletions(-) create mode 100644 crates/matrix-sdk-sqlite/migrations/state_store/005_send_queue_dependent_events.sql diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 5077156e8..533a6f064 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -28,13 +28,13 @@ use ruma::{ AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent, }, - mxc_uri, owned_mxc_uri, room_id, + mxc_uri, owned_event_id, owned_mxc_uri, room_id, serde::Raw, uint, user_id, EventId, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId, }; use serde_json::{json, value::Value as JsonValue}; -use super::{DynStateStore, ServerCapabilities}; +use super::{DependentQueuedEventKind, DynStateStore, ServerCapabilities}; use crate::{ deserialized_responses::MemberEvent, media::{MediaFormat, MediaRequest, MediaThumbnailSettings}, @@ -89,6 +89,8 @@ pub trait StateStoreIntegrationTests { async fn test_display_names_saving(&self); /// Test operations with the send queue. async fn test_send_queue(&self); + /// Test operations related to send queue dependents. + async fn test_send_queue_dependents(&self); /// Test saving/restoring server capabilities. async fn test_server_capabilities_saving(&self); } @@ -1481,6 +1483,88 @@ impl StateStoreIntegrationTests for DynStateStore { assert!(outstanding_rooms.iter().any(|room| room == room_id)); assert!(outstanding_rooms.iter().any(|room| room == room_id2)); } + + async fn test_send_queue_dependents(&self) { + let room_id = room_id!("!test_send_queue_dependents:localhost"); + + // Save one send queue event to start with. + let txn0 = TransactionId::new(); + let event0 = + SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into()) + .unwrap(); + self.save_send_queue_event(room_id, txn0.clone(), event0).await.unwrap(); + + // No dependents, to start with. + assert!(self.list_dependent_send_queue_events(room_id).await.unwrap().is_empty()); + + // Save a redaction for that event. + self.save_dependent_send_queue_event(room_id, &txn0, DependentQueuedEventKind::Redact) + .await + .unwrap(); + + // It worked. + let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap(); + assert_eq!(dependents.len(), 1); + assert_eq!(dependents[0].transaction_id, txn0); + assert!(dependents[0].event_id.is_none()); + assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact); + + // Update the event id. + let event_id = owned_event_id!("$1"); + let num_updated = + self.update_dependent_send_queue_event(room_id, &txn0, event_id.clone()).await.unwrap(); + assert_eq!(num_updated, 1); + + // It worked. + let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap(); + assert_eq!(dependents.len(), 1); + assert_eq!(dependents[0].transaction_id, txn0); + assert_eq!(dependents[0].event_id.as_ref(), Some(&event_id)); + assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact); + + // Now remove it. + let removed = + self.remove_dependent_send_queue_event(room_id, dependents[0].id).await.unwrap(); + assert!(removed); + + // It worked. + assert!(self.list_dependent_send_queue_events(room_id).await.unwrap().is_empty()); + + // Now, inserting a dependent event and removing the original send queue event + // will NOT remove the dependent event. + let txn1 = TransactionId::new(); + let event1 = + SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into()) + .unwrap(); + self.save_send_queue_event(room_id, txn1.clone(), event1).await.unwrap(); + + self.save_dependent_send_queue_event(room_id, &txn0, DependentQueuedEventKind::Redact) + .await + .unwrap(); + assert_eq!(self.list_dependent_send_queue_events(room_id).await.unwrap().len(), 1); + + self.save_dependent_send_queue_event( + room_id, + &txn1, + DependentQueuedEventKind::Edit { + new_content: SerializableEventContent::new( + &RoomMessageEventContent::text_plain("edit").into(), + ) + .unwrap(), + }, + ) + .await + .unwrap(); + assert_eq!(self.list_dependent_send_queue_events(room_id).await.unwrap().len(), 2); + + // Remove event0 / txn0. + let removed = self.remove_send_queue_event(room_id, &txn0).await.unwrap(); + assert!(removed); + + // This has removed none of the dependent events. + let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap(); + assert_eq!(dependents.len(), 2); + } } /// Macro building to allow your StateStore implementation to run the entire @@ -1649,6 +1733,12 @@ macro_rules! statestore_integration_tests { let store = get_store().await.expect("creating store failed").into_state_store(); store.test_send_queue().await; } + + #[async_test] + async fn test_send_queue_dependents() { + let store = get_store().await.expect("creating store failed").into_state_store(); + store.test_send_queue_dependents().await; + } }; } diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index ad8a02e60..15fe6f5ce 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -15,7 +15,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, num::NonZeroUsize, - sync::RwLock as StdRwLock, + sync::{Mutex, RwLock as StdRwLock}, }; use async_trait::async_trait; @@ -38,7 +38,8 @@ use tracing::{debug, instrument, trace, warn}; use super::{ traits::{ComposerDraft, QueuedEvent, SerializableEventContent, ServerCapabilities}, - Result, RoomInfo, StateChanges, StateStore, StoreError, + DependentQueuedEvent, DependentQueuedEventKind, Result, RoomInfo, StateChanges, StateStore, + StoreError, }; use crate::{ deserialized_responses::RawAnySyncOrStrippedState, @@ -46,7 +47,7 @@ use crate::{ MinimalRoomMemberEvent, RoomMemberships, RoomState, StateStoreDataKey, StateStoreDataValue, }; -/// In-Memory, non-persistent implementation of the `StateStore` +/// In-memory, non-persistent implementation of the `StateStore`. /// /// Default if no other is configured at startup. #[allow(clippy::type_complexity)] @@ -90,6 +91,8 @@ pub struct MemoryStore { media: StdRwLock)>>, custom: StdRwLock, Vec>>, send_queue_events: StdRwLock>>, + dependent_send_queue_events: StdRwLock>>, + dependent_send_queue_event_next_id: Mutex, } // SAFETY: `new_unchecked` is safe because 20 is not zero. @@ -120,6 +123,8 @@ impl Default for MemoryStore { media: StdRwLock::new(RingBuffer::new(NUMBER_OF_MEDIAS)), custom: Default::default(), send_queue_events: Default::default(), + dependent_send_queue_events: Default::default(), + dependent_send_queue_event_next_id: Mutex::new(0), } } } @@ -996,6 +1001,74 @@ impl StateStore for MemoryStore { async fn load_rooms_with_unsent_events(&self) -> Result, Self::Error> { Ok(self.send_queue_events.read().unwrap().keys().cloned().collect()) } + + async fn save_dependent_send_queue_event( + &self, + room: &RoomId, + transaction_id: &TransactionId, + content: DependentQueuedEventKind, + ) -> Result<(), Self::Error> { + let id = { + let mut next_id = self.dependent_send_queue_event_next_id.lock().unwrap(); + // Don't tell anyone, but sometimes I miss C++'s `x++` operator. + let id = *next_id; + *next_id += 1; + id + }; + + self.dependent_send_queue_events.write().unwrap().entry(room.to_owned()).or_default().push( + DependentQueuedEvent { + id, + kind: content, + transaction_id: transaction_id.to_owned(), + event_id: None, + }, + ); + + Ok(()) + } + + async fn update_dependent_send_queue_event( + &self, + room: &RoomId, + transaction_id: &TransactionId, + event_id: OwnedEventId, + ) -> Result { + let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap(); + let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default(); + let mut num_updated = 0; + for d in dependents.iter_mut().filter(|item| item.transaction_id == transaction_id) { + d.event_id = Some(event_id.clone()); + num_updated += 1; + } + Ok(num_updated) + } + + async fn remove_dependent_send_queue_event( + &self, + room: &RoomId, + id: usize, + ) -> Result { + let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap(); + let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default(); + if let Some(pos) = dependents.iter().position(|item| item.id == id) { + dependents.remove(pos); + Ok(true) + } else { + Ok(false) + } + } + + /// List all the dependent send queue events. + /// + /// This returns absolutely all the dependent send queue events, whether + /// they have an event id or not. + async fn list_dependent_send_queue_events( + &self, + room: &RoomId, + ) -> Result, Self::Error> { + Ok(self.dependent_send_queue_events.read().unwrap().get(room).cloned().unwrap_or_default()) + } } #[cfg(test)] diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 560842e65..ff0b61c65 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -72,9 +72,9 @@ pub use self::integration_tests::StateStoreIntegrationTests; pub use self::{ memory_store::MemoryStore, traits::{ - ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, QueuedEvent, - SerializableEventContent, ServerCapabilities, StateStore, StateStoreDataKey, - StateStoreDataValue, StateStoreExt, + ComposerDraft, ComposerDraftType, DependentQueuedEvent, DependentQueuedEventKind, + DynStateStore, IntoStateStore, QueuedEvent, SerializableEventContent, ServerCapabilities, + StateStore, StateStoreDataKey, StateStoreDataValue, StateStoreExt, }, }; diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index ef6740dc1..ef83b9896 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -454,6 +454,43 @@ pub trait StateStore: AsyncTraitDeps { /// Loads all the rooms which have any pending events in their send queue. async fn load_rooms_with_unsent_events(&self) -> Result, Self::Error>; + + /// Add a new entry to the list of dependent send queue event for an event. + async fn save_dependent_send_queue_event( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + content: DependentQueuedEventKind, + ) -> Result<(), Self::Error>; + + /// Update a set of dependent send queue events with an event id, + /// effectively marking them as ready. + /// + /// Returns the number of updated events. + async fn update_dependent_send_queue_event( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + event_id: OwnedEventId, + ) -> Result; + + /// Remove a specific dependent send queue event by id. + /// + /// Returns true if the dependent send queue event has been indeed removed. + async fn remove_dependent_send_queue_event( + &self, + room: &RoomId, + id: usize, + ) -> Result; + + /// List all the dependent send queue events. + /// + /// This returns absolutely all the dependent send queue events, whether + /// they have an event id or not. They must be returned in insertion order. + async fn list_dependent_send_queue_events( + &self, + room: &RoomId, + ) -> Result, Self::Error>; } #[repr(transparent)] @@ -726,6 +763,45 @@ impl StateStore for EraseStateStoreError { async fn load_rooms_with_unsent_events(&self) -> Result, Self::Error> { self.0.load_rooms_with_unsent_events().await.map_err(Into::into) } + + async fn save_dependent_send_queue_event( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + content: DependentQueuedEventKind, + ) -> Result<(), Self::Error> { + self.0 + .save_dependent_send_queue_event(room_id, transaction_id, content) + .await + .map_err(Into::into) + } + + async fn update_dependent_send_queue_event( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + event_id: OwnedEventId, + ) -> Result { + self.0 + .update_dependent_send_queue_event(room_id, transaction_id, event_id) + .await + .map_err(Into::into) + } + + async fn remove_dependent_send_queue_event( + &self, + room_id: &RoomId, + id: usize, + ) -> Result { + self.0.remove_dependent_send_queue_event(room_id, id).await.map_err(Into::into) + } + + async fn list_dependent_send_queue_events( + &self, + room_id: &RoomId, + ) -> Result, Self::Error> { + self.0.list_dependent_send_queue_events(room_id).await.map_err(Into::into) + } } /// Convenience functionality for state stores. @@ -1163,6 +1239,50 @@ pub struct QueuedEvent { pub is_wedged: bool, } +/// The specific user intent that characterizes a [`DependentQueuedEvent`]. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum DependentQueuedEventKind { + /// The event should be edited. + Edit { + /// The new event for the content. + new_content: SerializableEventContent, + }, + + /// The event should be redacted/aborted/removed. + Redact, +} + +/// An event to be sent, depending on a [`QueuedEvent`] to be sent first. +/// +/// Depending on whether the event has been sent or not, this will either update +/// the local echo in the storage, or send an event equivalent to the user +/// intent to the homeserver. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DependentQueuedEvent { + /// Unique identifier for this dependent queued event. + /// + /// Useful for deletion. + pub id: usize, + + /// The kind of user intent. + pub kind: DependentQueuedEventKind, + + /// Transaction id for the parent's local echo / used in the server request. + /// + /// Note: this is the transaction id used for the depended-on event, i.e. + /// the one that was originally sent and that's being modified with this + /// dependent event. + pub transaction_id: OwnedTransactionId, + + /// If the parent event has been sent, the parent's event identifier + /// returned by the server once the local echo has been sent out. + /// + /// Note: this is the event id used for the depended-on event after it's + /// been sent, not for a possible event that could have been sent + /// because of this [`DependentQueuedEvent`]. + pub event_id: Option, +} + #[cfg(not(tarpaulin_include))] impl fmt::Debug for QueuedEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs b/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs index 01c879830..fb8c3e28f 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs @@ -46,7 +46,7 @@ use super::{ }; use crate::IndexeddbStateStoreError; -const CURRENT_DB_VERSION: u32 = 9; +const CURRENT_DB_VERSION: u32 = 10; const CURRENT_META_DB_VERSION: u32 = 2; /// Sometimes Migrations can't proceed without having to drop existing @@ -228,6 +228,9 @@ pub async fn upgrade_inner_db( if old_version < 9 { db = migrate_to_v9(db).await?; } + if old_version < 10 { + db = migrate_to_v10(db).await?; + } } db.close(); @@ -744,6 +747,16 @@ async fn migrate_to_v9(db: IdbDatabase) -> Result { apply_migration(db, 9, migration).await } +/// Add the new [`keys::DEPENDENT_SEND_QUEUE`] table. +async fn migrate_to_v10(db: IdbDatabase) -> Result { + let migration = OngoingMigration { + drop_stores: [].into(), + create_stores: [keys::DEPENDENT_SEND_QUEUE].into_iter().collect(), + data: Default::default(), + }; + apply_migration(db, 10, migration).await +} + #[cfg(all(test, target_arch = "wasm32"))] mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index bd833d4b3..9054d04fa 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -26,8 +26,8 @@ use matrix_sdk_base::{ deserialized_responses::RawAnySyncOrStrippedState, media::{MediaRequest, UniqueKey}, store::{ - ComposerDraft, QueuedEvent, SerializableEventContent, ServerCapabilities, StateChanges, - StateStore, StoreError, + ComposerDraft, DependentQueuedEvent, DependentQueuedEventKind, QueuedEvent, + SerializableEventContent, ServerCapabilities, StateChanges, StateStore, StoreError, }, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateStoreDataKey, StateStoreDataValue, @@ -108,7 +108,10 @@ mod keys { pub const ROOM_INFOS: &str = "room_infos"; pub const PRESENCE: &str = "presence"; pub const ROOM_ACCOUNT_DATA: &str = "room_account_data"; + /// Table used to save send queue events. pub const ROOM_SEND_QUEUE: &str = "room_send_queue"; + /// Table used to save dependent send queue events. + pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue"; pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state"; pub const STRIPPED_USER_IDS: &str = "stripped_user_ids"; @@ -136,6 +139,7 @@ mod keys { ROOM_USER_RECEIPTS, ROOM_EVENT_RECEIPTS, ROOM_SEND_QUEUE, + DEPENDENT_SEND_QUEUE, MEDIA, CUSTOM, KV, @@ -1316,7 +1320,7 @@ impl_state_store!({ async fn remove_room(&self, room_id: &RoomId) -> Result<()> { // All the stores which use a RoomId as their key (and nothing additional). - let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE]; + let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE]; // All the stores which use a RoomId as the first part of their key, but may // have some additional data in the key. @@ -1464,9 +1468,10 @@ impl_state_store!({ ) -> Result { let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id); - let tx = self - .inner - .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?; + let tx = self.inner.transaction_on_multi_with_mode( + &[keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE], + IdbTransactionMode::Readwrite, + )?; let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?; @@ -1566,6 +1571,142 @@ impl_state_store!({ Ok(all_entries.into_iter().collect()) } + + async fn save_dependent_send_queue_event( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + content: DependentQueuedEventKind, + ) -> Result<()> { + let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); + + let tx = self.inner.transaction_on_one_with_mode( + keys::DEPENDENT_SEND_QUEUE, + IdbTransactionMode::Readwrite, + )?; + + let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?; + + // We store an encoded vector of the dependent events. + // Reload the previous vector for this room, or create an empty one. + let prev = obj.get(&encoded_key)?.await?; + + let mut prev = prev.map_or_else( + || Ok(Vec::new()), + |val| self.deserialize_value::>(&val), + )?; + + // Find the next id by taking the biggest ID we had before, and add 1. + let next_id = prev.iter().fold(0, |max, item| item.id.max(max)) + 1; + + // Push the new event. + prev.push(DependentQueuedEvent { + id: next_id, + kind: content, + transaction_id: transaction_id.to_owned(), + event_id: None, + }); + + // Save the new vector into db. + obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?; + + tx.await.into_result()?; + + Ok(()) + } + + async fn update_dependent_send_queue_event( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + event_id: OwnedEventId, + ) -> Result { + let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); + + let tx = self.inner.transaction_on_one_with_mode( + keys::DEPENDENT_SEND_QUEUE, + IdbTransactionMode::Readwrite, + )?; + + let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?; + + // We store an encoded vector of the dependent events. + // Reload the previous vector for this room, or create an empty one. + let prev = obj.get(&encoded_key)?.await?; + + let mut prev = prev.map_or_else( + || Ok(Vec::new()), + |val| self.deserialize_value::>(&val), + )?; + + // Modify all events that match. + let mut num_updated = 0; + for entry in prev.iter_mut().filter(|entry| entry.transaction_id == transaction_id) { + entry.event_id = Some(event_id.clone()); + num_updated += 1; + } + + if num_updated > 0 { + obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?; + tx.await.into_result()?; + } + + Ok(num_updated) + } + + async fn remove_dependent_send_queue_event(&self, room_id: &RoomId, id: usize) -> Result { + let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); + + let tx = self.inner.transaction_on_one_with_mode( + keys::DEPENDENT_SEND_QUEUE, + IdbTransactionMode::Readwrite, + )?; + + let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?; + + // We store an encoded vector of the dependent events. + // Reload the previous vector for this room. + if let Some(val) = obj.get(&encoded_key)?.await? { + let mut prev = self.deserialize_value::>(&val)?; + if let Some(pos) = prev.iter().position(|item| item.id == id) { + prev.remove(pos); + + if prev.is_empty() { + obj.delete(&encoded_key)?; + } else { + obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?; + } + + tx.await.into_result()?; + return Ok(true); + } + } + + Ok(false) + } + + async fn list_dependent_send_queue_events( + &self, + room_id: &RoomId, + ) -> Result> { + let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); + + // We store an encoded vector of the dependent events. + let prev = self + .inner + .transaction_on_one_with_mode( + keys::DEPENDENT_SEND_QUEUE, + IdbTransactionMode::Readwrite, + )? + .object_store(keys::DEPENDENT_SEND_QUEUE)? + .get(&encoded_key)? + .await?; + + prev.map_or_else( + || Ok(Vec::new()), + |val| self.deserialize_value::>(&val), + ) + } }); /// A room member. diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/005_send_queue_dependent_events.sql b/crates/matrix-sdk-sqlite/migrations/state_store/005_send_queue_dependent_events.sql new file mode 100644 index 000000000..fc9268801 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/state_store/005_send_queue_dependent_events.sql @@ -0,0 +1,14 @@ +-- Send queue dependent events +CREATE TABLE "dependent_send_queue_events" ( + -- This is used as a key, thus hashed. + "room_id" BLOB NOT NULL, + + -- This is used as both a key and a value, thus neither encrypted/decrypted/hashed. + "transaction_id" BLOB NOT NULL, + + -- Used as a value (thus encrypted/decrypted), can be null. + "event_id" BLOB NULL, + + -- Serialized `DependentQueuedEventKind`, used as a value (thus encrypted/decrypted). + "content" BLOB NOT NULL +); diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index a6fa9a02b..a7d5a6285 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -11,7 +11,10 @@ use deadpool_sqlite::{Object as SqliteConn, Pool as SqlitePool, Runtime}; use matrix_sdk_base::{ deserialized_responses::{RawAnySyncOrStrippedState, SyncOrStrippedState}, media::{MediaRequest, UniqueKey}, - store::{migration_helpers::RoomInfoV1, QueuedEvent, SerializableEventContent}, + store::{ + migration_helpers::RoomInfoV1, DependentQueuedEvent, DependentQueuedEventKind, QueuedEvent, + SerializableEventContent, + }, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore, StateStoreDataKey, StateStoreDataValue, }; @@ -57,9 +60,15 @@ mod keys { pub const DISPLAY_NAME: &str = "display_name"; pub const MEDIA: &str = "media"; pub const SEND_QUEUE: &str = "send_queue_events"; + pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events"; } -const DATABASE_VERSION: u8 = 5; +/// Identifier of the latest database version. +/// +/// This is used to figure whether the sqlite database requires a migration. +/// Every new SQL migration should imply a bump of this number, and changes in +/// the [`SqliteStateStore::run_migrations`] function.. +const DATABASE_VERSION: u8 = 6; /// A sqlite based cryptostore. #[derive(Clone)] @@ -235,6 +244,17 @@ impl SqliteStateStore { .await?; } + if from < 6 && to >= 6 { + conn.with_transaction(move |txn| { + // Create new table. + txn.execute_batch(include_str!( + "../migrations/state_store/005_send_queue_dependent_events.sql" + ))?; + Result::<_, Error>::Ok(()) + }) + .await?; + } + conn.set_kv("version", vec![to]).await?; Ok(()) @@ -1755,7 +1775,7 @@ impl StateStore for SqliteStateStore { txn.prepare_cached( "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?", )? - .execute((room_id, transaction_id)) + .execute((room_id, &transaction_id)) }) .await?; @@ -1838,6 +1858,100 @@ impl StateStore for SqliteStateStore { .into_iter() .collect()) } + + async fn save_dependent_send_queue_event( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + content: DependentQueuedEventKind, + ) -> Result<()> { + let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); + let content = self.serialize_json(&content)?; + + // See comment in `save_send_queue_event`. + let transaction_id = transaction_id.to_string(); + + self.acquire() + .await? + .with_transaction(move |txn| { + txn.prepare_cached("INSERT INTO dependent_send_queue_events (room_id, transaction_id, content) VALUES (?, ?, ?)")?.execute((room_id, transaction_id, content))?; + Ok(()) + }) + .await + } + + async fn update_dependent_send_queue_event( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + event_id: OwnedEventId, + ) -> Result { + let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); + let event_id = self.serialize_value(&event_id)?; + + // See comment in `save_send_queue_event`. + let transaction_id = transaction_id.to_string(); + + self.acquire() + .await? + .with_transaction(move |txn| { + Ok(txn.prepare_cached( + "UPDATE dependent_send_queue_events SET event_id = ? WHERE transaction_id = ? and room_id = ?", + )? + .execute((event_id, transaction_id, room_id))?) + }) + .await + } + + async fn remove_dependent_send_queue_event(&self, room_id: &RoomId, id: usize) -> Result { + let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); + + let num_deleted = self + .acquire() + .await? + .with_transaction(move |txn| { + txn.prepare_cached( + "DELETE FROM dependent_send_queue_events WHERE ROWID = ? AND room_id = ?", + )? + .execute((id, room_id)) + }) + .await?; + + Ok(num_deleted > 0) + } + + async fn list_dependent_send_queue_events( + &self, + room_id: &RoomId, + ) -> Result> { + let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); + + // Note: transaction_id is not encoded, see why in `save_send_queue_event`. + let res: Vec<(usize, String, Option>, Vec)> = self + .acquire() + .await? + .prepare( + "SELECT ROWID, transaction_id, event_id, content FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID", + |mut stmt| { + stmt.query((room_id,))? + .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))) + .collect() + }, + ) + .await?; + + let mut dependent_events = Vec::with_capacity(res.len()); + for entry in res { + dependent_events.push(DependentQueuedEvent { + id: entry.0, + transaction_id: entry.1.into(), + event_id: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?, + kind: self.deserialize_json(&entry.3)?, + }); + } + + Ok(dependent_events) + } } #[derive(Debug, Clone, Serialize, Deserialize)]