state store: add dependent queued events tables and operations

This commit is contained in:
Benjamin Bouvier
2024-07-15 16:32:17 +02:00
parent f0ef37efae
commit 9a7f18c62c
8 changed files with 583 additions and 18 deletions

View File

@@ -28,13 +28,13 @@ use ruma::{
AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
SyncStateEvent,
},
mxc_uri, owned_mxc_uri, room_id,
mxc_uri, owned_event_id, owned_mxc_uri, room_id,
serde::Raw,
uint, user_id, EventId, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
};
use serde_json::{json, value::Value as JsonValue};
use super::{DynStateStore, ServerCapabilities};
use super::{DependentQueuedEventKind, DynStateStore, ServerCapabilities};
use crate::{
deserialized_responses::MemberEvent,
media::{MediaFormat, MediaRequest, MediaThumbnailSettings},
@@ -89,6 +89,8 @@ pub trait StateStoreIntegrationTests {
async fn test_display_names_saving(&self);
/// Test operations with the send queue.
async fn test_send_queue(&self);
/// Test operations related to send queue dependents.
async fn test_send_queue_dependents(&self);
/// Test saving/restoring server capabilities.
async fn test_server_capabilities_saving(&self);
}
@@ -1481,6 +1483,88 @@ impl StateStoreIntegrationTests for DynStateStore {
assert!(outstanding_rooms.iter().any(|room| room == room_id));
assert!(outstanding_rooms.iter().any(|room| room == room_id2));
}
async fn test_send_queue_dependents(&self) {
let room_id = room_id!("!test_send_queue_dependents:localhost");
// Save one send queue event to start with.
let txn0 = TransactionId::new();
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
.unwrap();
self.save_send_queue_event(room_id, txn0.clone(), event0).await.unwrap();
// No dependents, to start with.
assert!(self.list_dependent_send_queue_events(room_id).await.unwrap().is_empty());
// Save a redaction for that event.
self.save_dependent_send_queue_event(room_id, &txn0, DependentQueuedEventKind::Redact)
.await
.unwrap();
// It worked.
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].transaction_id, txn0);
assert!(dependents[0].event_id.is_none());
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);
// Update the event id.
let event_id = owned_event_id!("$1");
let num_updated =
self.update_dependent_send_queue_event(room_id, &txn0, event_id.clone()).await.unwrap();
assert_eq!(num_updated, 1);
// It worked.
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].transaction_id, txn0);
assert_eq!(dependents[0].event_id.as_ref(), Some(&event_id));
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);
// Now remove it.
let removed =
self.remove_dependent_send_queue_event(room_id, dependents[0].id).await.unwrap();
assert!(removed);
// It worked.
assert!(self.list_dependent_send_queue_events(room_id).await.unwrap().is_empty());
// Now, inserting a dependent event and removing the original send queue event
// will NOT remove the dependent event.
let txn1 = TransactionId::new();
let event1 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
.unwrap();
self.save_send_queue_event(room_id, txn1.clone(), event1).await.unwrap();
self.save_dependent_send_queue_event(room_id, &txn0, DependentQueuedEventKind::Redact)
.await
.unwrap();
assert_eq!(self.list_dependent_send_queue_events(room_id).await.unwrap().len(), 1);
self.save_dependent_send_queue_event(
room_id,
&txn1,
DependentQueuedEventKind::Edit {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
)
.unwrap(),
},
)
.await
.unwrap();
assert_eq!(self.list_dependent_send_queue_events(room_id).await.unwrap().len(), 2);
// Remove event0 / txn0.
let removed = self.remove_send_queue_event(room_id, &txn0).await.unwrap();
assert!(removed);
// This has removed none of the dependent events.
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
assert_eq!(dependents.len(), 2);
}
}
/// Macro building to allow your StateStore implementation to run the entire
@@ -1649,6 +1733,12 @@ macro_rules! statestore_integration_tests {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_send_queue().await;
}
#[async_test]
async fn test_send_queue_dependents() {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_send_queue_dependents().await;
}
};
}

View File

@@ -15,7 +15,7 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
num::NonZeroUsize,
sync::RwLock as StdRwLock,
sync::{Mutex, RwLock as StdRwLock},
};
use async_trait::async_trait;
@@ -38,7 +38,8 @@ use tracing::{debug, instrument, trace, warn};
use super::{
traits::{ComposerDraft, QueuedEvent, SerializableEventContent, ServerCapabilities},
Result, RoomInfo, StateChanges, StateStore, StoreError,
DependentQueuedEvent, DependentQueuedEventKind, Result, RoomInfo, StateChanges, StateStore,
StoreError,
};
use crate::{
deserialized_responses::RawAnySyncOrStrippedState,
@@ -46,7 +47,7 @@ use crate::{
MinimalRoomMemberEvent, RoomMemberships, RoomState, StateStoreDataKey, StateStoreDataValue,
};
/// In-Memory, non-persistent implementation of the `StateStore`
/// In-memory, non-persistent implementation of the `StateStore`.
///
/// Default if no other is configured at startup.
#[allow(clippy::type_complexity)]
@@ -90,6 +91,8 @@ pub struct MemoryStore {
media: StdRwLock<RingBuffer<(OwnedMxcUri, String /* unique key */, Vec<u8>)>>,
custom: StdRwLock<HashMap<Vec<u8>, Vec<u8>>>,
send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<QueuedEvent>>>,
dependent_send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<DependentQueuedEvent>>>,
dependent_send_queue_event_next_id: Mutex<usize>,
}
// SAFETY: `new_unchecked` is safe because 20 is not zero.
@@ -120,6 +123,8 @@ impl Default for MemoryStore {
media: StdRwLock::new(RingBuffer::new(NUMBER_OF_MEDIAS)),
custom: Default::default(),
send_queue_events: Default::default(),
dependent_send_queue_events: Default::default(),
dependent_send_queue_event_next_id: Mutex::new(0),
}
}
}
@@ -996,6 +1001,74 @@ impl StateStore for MemoryStore {
async fn load_rooms_with_unsent_events(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
Ok(self.send_queue_events.read().unwrap().keys().cloned().collect())
}
async fn save_dependent_send_queue_event(
&self,
room: &RoomId,
transaction_id: &TransactionId,
content: DependentQueuedEventKind,
) -> Result<(), Self::Error> {
let id = {
let mut next_id = self.dependent_send_queue_event_next_id.lock().unwrap();
// Don't tell anyone, but sometimes I miss C++'s `x++` operator.
let id = *next_id;
*next_id += 1;
id
};
self.dependent_send_queue_events.write().unwrap().entry(room.to_owned()).or_default().push(
DependentQueuedEvent {
id,
kind: content,
transaction_id: transaction_id.to_owned(),
event_id: None,
},
);
Ok(())
}
async fn update_dependent_send_queue_event(
&self,
room: &RoomId,
transaction_id: &TransactionId,
event_id: OwnedEventId,
) -> Result<usize, Self::Error> {
let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap();
let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default();
let mut num_updated = 0;
for d in dependents.iter_mut().filter(|item| item.transaction_id == transaction_id) {
d.event_id = Some(event_id.clone());
num_updated += 1;
}
Ok(num_updated)
}
async fn remove_dependent_send_queue_event(
&self,
room: &RoomId,
id: usize,
) -> Result<bool, Self::Error> {
let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap();
let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default();
if let Some(pos) = dependents.iter().position(|item| item.id == id) {
dependents.remove(pos);
Ok(true)
} else {
Ok(false)
}
}
/// List all the dependent send queue events.
///
/// This returns absolutely all the dependent send queue events, whether
/// they have an event id or not.
async fn list_dependent_send_queue_events(
&self,
room: &RoomId,
) -> Result<Vec<DependentQueuedEvent>, Self::Error> {
Ok(self.dependent_send_queue_events.read().unwrap().get(room).cloned().unwrap_or_default())
}
}
#[cfg(test)]

View File

@@ -72,9 +72,9 @@ pub use self::integration_tests::StateStoreIntegrationTests;
pub use self::{
memory_store::MemoryStore,
traits::{
ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, QueuedEvent,
SerializableEventContent, ServerCapabilities, StateStore, StateStoreDataKey,
StateStoreDataValue, StateStoreExt,
ComposerDraft, ComposerDraftType, DependentQueuedEvent, DependentQueuedEventKind,
DynStateStore, IntoStateStore, QueuedEvent, SerializableEventContent, ServerCapabilities,
StateStore, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
},
};

View File

@@ -454,6 +454,43 @@ pub trait StateStore: AsyncTraitDeps {
/// Loads all the rooms which have any pending events in their send queue.
async fn load_rooms_with_unsent_events(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
/// Add a new entry to the list of dependent send queue event for an event.
async fn save_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
content: DependentQueuedEventKind,
) -> Result<(), Self::Error>;
/// Update a set of dependent send queue events with an event id,
/// effectively marking them as ready.
///
/// Returns the number of updated events.
async fn update_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
event_id: OwnedEventId,
) -> Result<usize, Self::Error>;
/// Remove a specific dependent send queue event by id.
///
/// Returns true if the dependent send queue event has been indeed removed.
async fn remove_dependent_send_queue_event(
&self,
room: &RoomId,
id: usize,
) -> Result<bool, Self::Error>;
/// List all the dependent send queue events.
///
/// This returns absolutely all the dependent send queue events, whether
/// they have an event id or not. They must be returned in insertion order.
async fn list_dependent_send_queue_events(
&self,
room: &RoomId,
) -> Result<Vec<DependentQueuedEvent>, Self::Error>;
}
#[repr(transparent)]
@@ -726,6 +763,45 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
async fn load_rooms_with_unsent_events(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
self.0.load_rooms_with_unsent_events().await.map_err(Into::into)
}
async fn save_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
content: DependentQueuedEventKind,
) -> Result<(), Self::Error> {
self.0
.save_dependent_send_queue_event(room_id, transaction_id, content)
.await
.map_err(Into::into)
}
async fn update_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
event_id: OwnedEventId,
) -> Result<usize, Self::Error> {
self.0
.update_dependent_send_queue_event(room_id, transaction_id, event_id)
.await
.map_err(Into::into)
}
async fn remove_dependent_send_queue_event(
&self,
room_id: &RoomId,
id: usize,
) -> Result<bool, Self::Error> {
self.0.remove_dependent_send_queue_event(room_id, id).await.map_err(Into::into)
}
async fn list_dependent_send_queue_events(
&self,
room_id: &RoomId,
) -> Result<Vec<DependentQueuedEvent>, Self::Error> {
self.0.list_dependent_send_queue_events(room_id).await.map_err(Into::into)
}
}
/// Convenience functionality for state stores.
@@ -1163,6 +1239,50 @@ pub struct QueuedEvent {
pub is_wedged: bool,
}
/// The specific user intent that characterizes a [`DependentQueuedEvent`].
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum DependentQueuedEventKind {
/// The event should be edited.
Edit {
/// The new event for the content.
new_content: SerializableEventContent,
},
/// The event should be redacted/aborted/removed.
Redact,
}
/// An event to be sent, depending on a [`QueuedEvent`] to be sent first.
///
/// Depending on whether the event has been sent or not, this will either update
/// the local echo in the storage, or send an event equivalent to the user
/// intent to the homeserver.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DependentQueuedEvent {
/// Unique identifier for this dependent queued event.
///
/// Useful for deletion.
pub id: usize,
/// The kind of user intent.
pub kind: DependentQueuedEventKind,
/// Transaction id for the parent's local echo / used in the server request.
///
/// Note: this is the transaction id used for the depended-on event, i.e.
/// the one that was originally sent and that's being modified with this
/// dependent event.
pub transaction_id: OwnedTransactionId,
/// If the parent event has been sent, the parent's event identifier
/// returned by the server once the local echo has been sent out.
///
/// Note: this is the event id used for the depended-on event after it's
/// been sent, not for a possible event that could have been sent
/// because of this [`DependentQueuedEvent`].
pub event_id: Option<OwnedEventId>,
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for QueuedEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

View File

@@ -46,7 +46,7 @@ use super::{
};
use crate::IndexeddbStateStoreError;
const CURRENT_DB_VERSION: u32 = 9;
const CURRENT_DB_VERSION: u32 = 10;
const CURRENT_META_DB_VERSION: u32 = 2;
/// Sometimes Migrations can't proceed without having to drop existing
@@ -228,6 +228,9 @@ pub async fn upgrade_inner_db(
if old_version < 9 {
db = migrate_to_v9(db).await?;
}
if old_version < 10 {
db = migrate_to_v10(db).await?;
}
}
db.close();
@@ -744,6 +747,16 @@ async fn migrate_to_v9(db: IdbDatabase) -> Result<IdbDatabase> {
apply_migration(db, 9, migration).await
}
/// Add the new [`keys::DEPENDENT_SEND_QUEUE`] table.
async fn migrate_to_v10(db: IdbDatabase) -> Result<IdbDatabase> {
let migration = OngoingMigration {
drop_stores: [].into(),
create_stores: [keys::DEPENDENT_SEND_QUEUE].into_iter().collect(),
data: Default::default(),
};
apply_migration(db, 10, migration).await
}
#[cfg(all(test, target_arch = "wasm32"))]
mod tests {
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

View File

@@ -26,8 +26,8 @@ use matrix_sdk_base::{
deserialized_responses::RawAnySyncOrStrippedState,
media::{MediaRequest, UniqueKey},
store::{
ComposerDraft, QueuedEvent, SerializableEventContent, ServerCapabilities, StateChanges,
StateStore, StoreError,
ComposerDraft, DependentQueuedEvent, DependentQueuedEventKind, QueuedEvent,
SerializableEventContent, ServerCapabilities, StateChanges, StateStore, StoreError,
},
MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateStoreDataKey,
StateStoreDataValue,
@@ -108,7 +108,10 @@ mod keys {
pub const ROOM_INFOS: &str = "room_infos";
pub const PRESENCE: &str = "presence";
pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
/// Table used to save send queue events.
pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
/// Table used to save dependent send queue events.
pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
@@ -136,6 +139,7 @@ mod keys {
ROOM_USER_RECEIPTS,
ROOM_EVENT_RECEIPTS,
ROOM_SEND_QUEUE,
DEPENDENT_SEND_QUEUE,
MEDIA,
CUSTOM,
KV,
@@ -1316,7 +1320,7 @@ impl_state_store!({
async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
// All the stores which use a RoomId as their key (and nothing additional).
let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE];
let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
// All the stores which use a RoomId as the first part of their key, but may
// have some additional data in the key.
@@ -1464,9 +1468,10 @@ impl_state_store!({
) -> Result<bool> {
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
let tx = self
.inner
.transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
let tx = self.inner.transaction_on_multi_with_mode(
&[keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE],
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
@@ -1566,6 +1571,142 @@ impl_state_store!({
Ok(all_entries.into_iter().collect())
}
async fn save_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
content: DependentQueuedEventKind,
) -> Result<()> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
let tx = self.inner.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
// We store an encoded vector of the dependent events.
// Reload the previous vector for this room, or create an empty one.
let prev = obj.get(&encoded_key)?.await?;
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<DependentQueuedEvent>>(&val),
)?;
// Find the next id by taking the biggest ID we had before, and add 1.
let next_id = prev.iter().fold(0, |max, item| item.id.max(max)) + 1;
// Push the new event.
prev.push(DependentQueuedEvent {
id: next_id,
kind: content,
transaction_id: transaction_id.to_owned(),
event_id: None,
});
// Save the new vector into db.
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
tx.await.into_result()?;
Ok(())
}
async fn update_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
event_id: OwnedEventId,
) -> Result<usize> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
let tx = self.inner.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
// We store an encoded vector of the dependent events.
// Reload the previous vector for this room, or create an empty one.
let prev = obj.get(&encoded_key)?.await?;
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<DependentQueuedEvent>>(&val),
)?;
// Modify all events that match.
let mut num_updated = 0;
for entry in prev.iter_mut().filter(|entry| entry.transaction_id == transaction_id) {
entry.event_id = Some(event_id.clone());
num_updated += 1;
}
if num_updated > 0 {
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
tx.await.into_result()?;
}
Ok(num_updated)
}
async fn remove_dependent_send_queue_event(&self, room_id: &RoomId, id: usize) -> Result<bool> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
let tx = self.inner.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
// We store an encoded vector of the dependent events.
// Reload the previous vector for this room.
if let Some(val) = obj.get(&encoded_key)?.await? {
let mut prev = self.deserialize_value::<Vec<DependentQueuedEvent>>(&val)?;
if let Some(pos) = prev.iter().position(|item| item.id == id) {
prev.remove(pos);
if prev.is_empty() {
obj.delete(&encoded_key)?;
} else {
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
}
tx.await.into_result()?;
return Ok(true);
}
}
Ok(false)
}
async fn list_dependent_send_queue_events(
&self,
room_id: &RoomId,
) -> Result<Vec<DependentQueuedEvent>> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
// We store an encoded vector of the dependent events.
let prev = self
.inner
.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?
.object_store(keys::DEPENDENT_SEND_QUEUE)?
.get(&encoded_key)?
.await?;
prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<DependentQueuedEvent>>(&val),
)
}
});
/// A room member.

View File

@@ -0,0 +1,14 @@
-- Send queue dependent events
CREATE TABLE "dependent_send_queue_events" (
-- This is used as a key, thus hashed.
"room_id" BLOB NOT NULL,
-- This is used as both a key and a value, thus neither encrypted/decrypted/hashed.
"transaction_id" BLOB NOT NULL,
-- Used as a value (thus encrypted/decrypted), can be null.
"event_id" BLOB NULL,
-- Serialized `DependentQueuedEventKind`, used as a value (thus encrypted/decrypted).
"content" BLOB NOT NULL
);

View File

@@ -11,7 +11,10 @@ use deadpool_sqlite::{Object as SqliteConn, Pool as SqlitePool, Runtime};
use matrix_sdk_base::{
deserialized_responses::{RawAnySyncOrStrippedState, SyncOrStrippedState},
media::{MediaRequest, UniqueKey},
store::{migration_helpers::RoomInfoV1, QueuedEvent, SerializableEventContent},
store::{
migration_helpers::RoomInfoV1, DependentQueuedEvent, DependentQueuedEventKind, QueuedEvent,
SerializableEventContent,
},
MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
StateStoreDataKey, StateStoreDataValue,
};
@@ -57,9 +60,15 @@ mod keys {
pub const DISPLAY_NAME: &str = "display_name";
pub const MEDIA: &str = "media";
pub const SEND_QUEUE: &str = "send_queue_events";
pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
}
const DATABASE_VERSION: u8 = 5;
/// Identifier of the latest database version.
///
/// This is used to figure whether the sqlite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`SqliteStateStore::run_migrations`] function..
const DATABASE_VERSION: u8 = 6;
/// A sqlite based cryptostore.
#[derive(Clone)]
@@ -235,6 +244,17 @@ impl SqliteStateStore {
.await?;
}
if from < 6 && to >= 6 {
conn.with_transaction(move |txn| {
// Create new table.
txn.execute_batch(include_str!(
"../migrations/state_store/005_send_queue_dependent_events.sql"
))?;
Result::<_, Error>::Ok(())
})
.await?;
}
conn.set_kv("version", vec![to]).await?;
Ok(())
@@ -1755,7 +1775,7 @@ impl StateStore for SqliteStateStore {
txn.prepare_cached(
"DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
)?
.execute((room_id, transaction_id))
.execute((room_id, &transaction_id))
})
.await?;
@@ -1838,6 +1858,100 @@ impl StateStore for SqliteStateStore {
.into_iter()
.collect())
}
async fn save_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
content: DependentQueuedEventKind,
) -> Result<()> {
let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
let content = self.serialize_json(&content)?;
// See comment in `save_send_queue_event`.
let transaction_id = transaction_id.to_string();
self.acquire()
.await?
.with_transaction(move |txn| {
txn.prepare_cached("INSERT INTO dependent_send_queue_events (room_id, transaction_id, content) VALUES (?, ?, ?)")?.execute((room_id, transaction_id, content))?;
Ok(())
})
.await
}
async fn update_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
event_id: OwnedEventId,
) -> Result<usize> {
let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
let event_id = self.serialize_value(&event_id)?;
// See comment in `save_send_queue_event`.
let transaction_id = transaction_id.to_string();
self.acquire()
.await?
.with_transaction(move |txn| {
Ok(txn.prepare_cached(
"UPDATE dependent_send_queue_events SET event_id = ? WHERE transaction_id = ? and room_id = ?",
)?
.execute((event_id, transaction_id, room_id))?)
})
.await
}
async fn remove_dependent_send_queue_event(&self, room_id: &RoomId, id: usize) -> Result<bool> {
let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
let num_deleted = self
.acquire()
.await?
.with_transaction(move |txn| {
txn.prepare_cached(
"DELETE FROM dependent_send_queue_events WHERE ROWID = ? AND room_id = ?",
)?
.execute((id, room_id))
})
.await?;
Ok(num_deleted > 0)
}
async fn list_dependent_send_queue_events(
&self,
room_id: &RoomId,
) -> Result<Vec<DependentQueuedEvent>> {
let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
// Note: transaction_id is not encoded, see why in `save_send_queue_event`.
let res: Vec<(usize, String, Option<Vec<u8>>, Vec<u8>)> = self
.acquire()
.await?
.prepare(
"SELECT ROWID, transaction_id, event_id, content FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
|mut stmt| {
stmt.query((room_id,))?
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)))
.collect()
},
)
.await?;
let mut dependent_events = Vec::with_capacity(res.len());
for entry in res {
dependent_events.push(DependentQueuedEvent {
id: entry.0,
transaction_id: entry.1.into(),
event_id: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
kind: self.deserialize_json(&entry.3)?,
});
}
Ok(dependent_events)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]