diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 03feadc83..c6b3b5a7c 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -90,6 +90,8 @@ pub trait StateStoreIntegrationTests { async fn test_send_queue_priority(&self); /// Test operations related to send queue dependents. async fn test_send_queue_dependents(&self); + /// Test an update to a send queue dependent request. + async fn test_update_send_queue_dependent(&self); /// Test saving/restoring server capabilities. async fn test_server_capabilities_saving(&self); } @@ -1548,6 +1550,54 @@ impl StateStoreIntegrationTests for DynStateStore { let dependents = self.load_dependent_queued_requests(room_id).await.unwrap(); assert_eq!(dependents.len(), 2); } + + async fn test_update_send_queue_dependent(&self) { + let room_id = room_id!("!test_send_queue_dependents:localhost"); + + let txn = TransactionId::new(); + + // Save a dependent redaction for an event. + let child_txn = ChildTransactionId::new(); + + self.save_dependent_queued_request( + room_id, + &txn, + child_txn.clone(), + DependentQueuedRequestKind::RedactEvent, + ) + .await + .unwrap(); + + // It worked. + let dependents = self.load_dependent_queued_requests(room_id).await.unwrap(); + assert_eq!(dependents.len(), 1); + assert_eq!(dependents[0].parent_transaction_id, txn); + assert_eq!(dependents[0].own_transaction_id, child_txn); + assert!(dependents[0].parent_key.is_none()); + assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent); + + // Make it a reaction, instead of a redaction. + self.update_dependent_queued_request( + room_id, + &child_txn, + DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() }, + ) + .await + .unwrap(); + + // It worked. + let dependents = self.load_dependent_queued_requests(room_id).await.unwrap(); + assert_eq!(dependents.len(), 1); + assert_eq!(dependents[0].parent_transaction_id, txn); + assert_eq!(dependents[0].own_transaction_id, child_txn); + assert!(dependents[0].parent_key.is_none()); + assert_matches!( + &dependents[0].kind, + DependentQueuedRequestKind::ReactEvent { key } => { + assert_eq!(key, "👍"); + } + ); + } } /// Macro building to allow your StateStore implementation to run the entire @@ -1706,6 +1756,12 @@ macro_rules! statestore_integration_tests { let store = get_store().await.expect("creating store failed").into_state_store(); store.test_send_queue_dependents().await; } + + #[async_test] + async fn test_update_send_queue_dependent() { + let store = get_store().await.expect("creating store failed").into_state_store(); + store.test_update_send_queue_dependent().await; + } } }; } diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 844e87fe2..2c8e1d849 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -933,6 +933,23 @@ impl StateStore for MemoryStore { Ok(num_updated) } + async fn update_dependent_queued_request( + &self, + room: &RoomId, + own_transaction_id: &ChildTransactionId, + new_content: DependentQueuedRequestKind, + ) -> Result { + let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap(); + let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default(); + for d in dependents.iter_mut() { + if d.own_transaction_id == *own_transaction_id { + d.kind = new_content; + return Ok(true); + } + } + Ok(false) + } + async fn remove_dependent_queued_request( &self, room: &RoomId, diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index ccc9707cd..6e34f4fe2 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -439,6 +439,16 @@ pub trait StateStore: AsyncTraitDeps { sent_parent_key: SentRequestKey, ) -> Result; + /// Update a dependent send queue request with the new content. + /// + /// Returns true if the request was found and could be updated. + async fn update_dependent_queued_request( + &self, + room_id: &RoomId, + own_transaction_id: &ChildTransactionId, + new_content: DependentQueuedRequestKind, + ) -> Result; + /// Remove a specific dependent send queue request by id. /// /// Returns true if the dependent send queue request has been indeed @@ -735,6 +745,18 @@ impl StateStore for EraseStateStoreError { ) -> Result, Self::Error> { self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into) } + + async fn update_dependent_queued_request( + &self, + room_id: &RoomId, + own_transaction_id: &ChildTransactionId, + new_content: DependentQueuedRequestKind, + ) -> Result { + self.0 + .update_dependent_queued_request(room_id, own_transaction_id, new_content) + .await + .map_err(Into::into) + } } /// Convenience functionality for state stores. diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index 82fbc3918..b8ca7442b 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -1594,6 +1594,48 @@ impl_state_store!({ Ok(()) } + async fn update_dependent_queued_request( + &self, + room_id: &RoomId, + own_transaction_id: &ChildTransactionId, + new_content: DependentQueuedRequestKind, + ) -> Result { + let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); + + let tx = self.inner.transaction_on_one_with_mode( + keys::DEPENDENT_SEND_QUEUE, + IdbTransactionMode::Readwrite, + )?; + + let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?; + + // We store an encoded vector of the dependent requests. + // Reload the previous vector for this room, or create an empty one. + let prev = obj.get(&encoded_key)?.await?; + + let mut prev = prev.map_or_else( + || Ok(Vec::new()), + |val| self.deserialize_value::>(&val), + )?; + + // Modify the dependent request, if found. + let mut found = false; + for entry in prev.iter_mut() { + if entry.own_transaction_id == *own_transaction_id { + found = true; + entry.kind = new_content; + break; + } + } + + if found { + obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?; + tx.await.into_result()?; + } + + Ok(found) + } + async fn mark_dependent_queued_requests_as_ready( &self, room_id: &RoomId, diff --git a/crates/matrix-sdk-sqlite/src/error.rs b/crates/matrix-sdk-sqlite/src/error.rs index 4a1eb5be8..ddfac2fbf 100644 --- a/crates/matrix-sdk-sqlite/src/error.rs +++ b/crates/matrix-sdk-sqlite/src/error.rs @@ -101,6 +101,9 @@ pub enum Error { #[error("Redaction failed: {0}")] Redaction(#[source] ruma::canonical_json::RedactionError), + + #[error("An update keyed by unique ID touched more than one entry")] + InconsistentUpdate, } macro_rules! impl_from { diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 1996dac65..36ff843cc 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -1924,6 +1924,39 @@ impl StateStore for SqliteStateStore { .await } + async fn update_dependent_queued_request( + &self, + room_id: &RoomId, + own_transaction_id: &ChildTransactionId, + new_content: DependentQueuedRequestKind, + ) -> Result { + let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); + let content = self.serialize_json(&new_content)?; + + // See comment in `save_send_queue_event`. + let own_txn_id = own_transaction_id.to_string(); + + let num_updated = self + .acquire() + .await? + .with_transaction(move |txn| { + txn.prepare_cached( + r#"UPDATE dependent_send_queue_events + SET content = ? + WHERE own_transaction_id = ? + AND room_id = ?"#, + )? + .execute((content, own_txn_id, room_id)) + }) + .await?; + + if num_updated > 1 { + return Err(Error::InconsistentUpdate); + } + + Ok(num_updated == 1) + } + async fn mark_dependent_queued_requests_as_ready( &self, room_id: &RoomId,