feat(base): add a way to update a dependent send queue request

This commit is contained in:
Benjamin Bouvier
2024-11-18 14:28:19 +01:00
parent fa47af3dd6
commit 0080f17c1f
6 changed files with 173 additions and 0 deletions

View File

@@ -90,6 +90,8 @@ pub trait StateStoreIntegrationTests {
async fn test_send_queue_priority(&self);
/// Test operations related to send queue dependents.
async fn test_send_queue_dependents(&self);
/// Test an update to a send queue dependent request.
async fn test_update_send_queue_dependent(&self);
/// Test saving/restoring server capabilities.
async fn test_server_capabilities_saving(&self);
}
@@ -1548,6 +1550,54 @@ impl StateStoreIntegrationTests for DynStateStore {
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 2);
}
async fn test_update_send_queue_dependent(&self) {
let room_id = room_id!("!test_send_queue_dependents:localhost");
let txn = TransactionId::new();
// Save a dependent redaction for an event.
let child_txn = ChildTransactionId::new();
self.save_dependent_queued_request(
room_id,
&txn,
child_txn.clone(),
DependentQueuedRequestKind::RedactEvent,
)
.await
.unwrap();
// It worked.
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].parent_transaction_id, txn);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert!(dependents[0].parent_key.is_none());
assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
// Make it a reaction, instead of a redaction.
self.update_dependent_queued_request(
room_id,
&child_txn,
DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
)
.await
.unwrap();
// It worked.
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].parent_transaction_id, txn);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert!(dependents[0].parent_key.is_none());
assert_matches!(
&dependents[0].kind,
DependentQueuedRequestKind::ReactEvent { key } => {
assert_eq!(key, "👍");
}
);
}
}
/// Macro building to allow your StateStore implementation to run the entire
@@ -1706,6 +1756,12 @@ macro_rules! statestore_integration_tests {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_send_queue_dependents().await;
}
#[async_test]
async fn test_update_send_queue_dependent() {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_update_send_queue_dependent().await;
}
}
};
}

View File

@@ -933,6 +933,23 @@ impl StateStore for MemoryStore {
Ok(num_updated)
}
async fn update_dependent_queued_request(
&self,
room: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error> {
let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap();
let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default();
for d in dependents.iter_mut() {
if d.own_transaction_id == *own_transaction_id {
d.kind = new_content;
return Ok(true);
}
}
Ok(false)
}
async fn remove_dependent_queued_request(
&self,
room: &RoomId,

View File

@@ -439,6 +439,16 @@ pub trait StateStore: AsyncTraitDeps {
sent_parent_key: SentRequestKey,
) -> Result<usize, Self::Error>;
/// Update a dependent send queue request with the new content.
///
/// Returns true if the request was found and could be updated.
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error>;
/// Remove a specific dependent send queue request by id.
///
/// Returns true if the dependent send queue request has been indeed
@@ -735,6 +745,18 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
}
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error> {
self.0
.update_dependent_queued_request(room_id, own_transaction_id, new_content)
.await
.map_err(Into::into)
}
}
/// Convenience functionality for state stores.

View File

@@ -1594,6 +1594,48 @@ impl_state_store!({
Ok(())
}
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
let tx = self.inner.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
// We store an encoded vector of the dependent requests.
// Reload the previous vector for this room, or create an empty one.
let prev = obj.get(&encoded_key)?.await?;
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
)?;
// Modify the dependent request, if found.
let mut found = false;
for entry in prev.iter_mut() {
if entry.own_transaction_id == *own_transaction_id {
found = true;
entry.kind = new_content;
break;
}
}
if found {
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
tx.await.into_result()?;
}
Ok(found)
}
async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,

View File

@@ -101,6 +101,9 @@ pub enum Error {
#[error("Redaction failed: {0}")]
Redaction(#[source] ruma::canonical_json::RedactionError),
#[error("An update keyed by unique ID touched more than one entry")]
InconsistentUpdate,
}
macro_rules! impl_from {

View File

@@ -1924,6 +1924,39 @@ impl StateStore for SqliteStateStore {
.await
}
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool> {
let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
let content = self.serialize_json(&new_content)?;
// See comment in `save_send_queue_event`.
let own_txn_id = own_transaction_id.to_string();
let num_updated = self
.acquire()
.await?
.with_transaction(move |txn| {
txn.prepare_cached(
r#"UPDATE dependent_send_queue_events
SET content = ?
WHERE own_transaction_id = ?
AND room_id = ?"#,
)?
.execute((content, own_txn_id, room_id))
})
.await?;
if num_updated > 1 {
return Err(Error::InconsistentUpdate);
}
Ok(num_updated == 1)
}
async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,