Merge pull request #4932 from matrix-org/rav/history_sharing/save_key_bundle_data

crypto: store received room key bundle data information

Add hooks to the memory store and sqlite store to stash the information about room key data.
This commit is contained in:
Richard van der Hoff
2025-04-24 12:22:14 +01:00
committed by GitHub
10 changed files with 267 additions and 43 deletions

View File

@@ -75,11 +75,11 @@ use crate::{
store::{
Changes, CryptoStoreWrapper, DeviceChanges, IdentityChanges, IntoCryptoStore, MemoryStore,
PendingChanges, Result as StoreResult, RoomKeyInfo, RoomSettings, SecretImportError, Store,
StoreCache, StoreTransaction,
StoreCache, StoreTransaction, StoredRoomKeyBundleData,
},
types::{
events::{
olm_v1::{AnyDecryptedOlmEvent, DecryptedRoomKeyEvent},
olm_v1::{AnyDecryptedOlmEvent, DecryptedRoomKeyBundleEvent, DecryptedRoomKeyEvent},
room::encrypted::{
EncryptedEvent, EncryptedToDeviceEvent, RoomEncryptedEventContent,
RoomEventEncryptionScheme, SupportedEventEncryptionSchemes,
@@ -939,6 +939,26 @@ impl OlmMachine {
}
}
#[instrument()]
async fn receive_room_key_bundle(
&self,
sender_key: Curve25519PublicKey,
event: &DecryptedRoomKeyBundleEvent,
changes: &mut Changes,
) -> OlmResult<()> {
let Some(sender_device_keys) = &event.sender_device_keys else {
warn!("Received a room key bundle with no sender device keys: ignoring");
return Ok(());
};
changes.received_room_key_bundles.push(StoredRoomKeyBundleData {
sender_user: event.sender.clone(),
sender_data: SenderData::device_info(sender_device_keys.clone()),
bundle_data: event.content.clone(),
});
Ok(())
}
fn add_withheld_info(&self, changes: &mut Changes, event: &RoomKeyWithheldEvent) {
debug!(?event.content, "Processing `m.room_key.withheld` event");
@@ -1184,6 +1204,10 @@ impl OlmMachine {
AnyDecryptedOlmEvent::Dummy(_) => {
debug!("Received an `m.dummy` event");
}
AnyDecryptedOlmEvent::RoomKeyBundle(e) => {
debug!("Received a room key bundle event {:?}", e);
self.receive_room_key_bundle(decrypted.result.sender_key, e, changes).await?;
}
AnyDecryptedOlmEvent::Custom(_) => {
warn!("Received an unexpected encrypted to-device event");
}

View File

@@ -33,7 +33,7 @@ use ruma::{
AddMentions, MessageType, Relation, ReplyWithinThread, RoomMessageEventContent,
},
AnyMessageLikeEvent, AnyMessageLikeEventContent, AnyToDeviceEvent, MessageLikeEvent,
OriginalMessageLikeEvent,
OriginalMessageLikeEvent, ToDeviceEventType,
},
room_id,
serde::Raw,
@@ -115,6 +115,7 @@ pub fn to_device_requests_to_content(
requests: Vec<Arc<ToDeviceRequest>>,
) -> ToDeviceEncryptedEventContent {
let to_device_request = &requests[0];
assert_eq!(to_device_request.event_type, ToDeviceEventType::RoomEncrypted);
to_device_request
.messages

View File

@@ -52,7 +52,7 @@ macro_rules! cryptostore_integration_tests {
},
store::{
BackupDecryptionKey, Changes, CryptoStore, DehydratedDeviceKey, DeviceChanges, GossipRequest,
IdentityChanges, PendingChanges, RoomSettings,
IdentityChanges, PendingChanges, RoomSettings, StoredRoomKeyBundleData,
},
testing::{get_device, get_other_identity, get_own_identity},
types::{
@@ -64,6 +64,7 @@ macro_rules! cryptostore_integration_tests {
CommonWithheldCodeContent, MegolmV1AesSha2WithheldContent,
RoomKeyWithheldContent,
},
room_key_bundle::RoomKeyBundleContent,
secret_send::SecretSendContent,
ToDeviceEvent,
},
@@ -1276,6 +1277,57 @@ macro_rules! cryptostore_integration_tests {
assert_eq!(None, loaded_2);
}
#[async_test]
#[ignore] // not yet implemented for all stores
async fn test_received_room_key_bundle() {
let store = get_store("received_room_key_bundle", None, true).await;
let test_room = room_id!("!room:example.org");
fn make_bundle_data(sender_user: &UserId, bundle_uri: &str) -> StoredRoomKeyBundleData {
let jwk = ruma::events::room::JsonWebKeyInit {
kty: "oct".to_owned(),
key_ops: vec!["encrypt".to_owned(), "decrypt".to_owned()],
alg: "A256CTR".to_owned(),
k: ruma::serde::Base64::new(vec![0u8; 0]),
ext: true,
}.into();
let file = ruma::events::room::EncryptedFileInit {
url: ruma::OwnedMxcUri::from(bundle_uri),
key: jwk,
iv: ruma::serde::Base64::new(vec![0u8; 0]),
hashes: Default::default(),
v: "".to_owned(),
}.into();
StoredRoomKeyBundleData {
sender_user: sender_user.to_owned(),
sender_data: SenderData::unknown(),
bundle_data: RoomKeyBundleContent {
room_id: room_id!("!room:example.org").to_owned(),
file,
},
}
}
// Add three entries
let changes = Changes {
received_room_key_bundles: vec![
make_bundle_data(user_id!("@alice:example.com"), "alice1"),
make_bundle_data(user_id!("@bob:example.com"), "bob1"),
make_bundle_data(user_id!("@alice:example.com"), "alice2"),
],
..Default::default()
};
store.save_changes(changes).await.unwrap();
// Check we get the right one
let bundle = store.get_received_room_key_bundle_data(
test_room, user_id!("@alice:example.com")
).await.unwrap().expect("Did not get any bundle data");
assert_eq!(bundle.bundle_data.file.url.to_string(), "alice2");
}
fn session_info(session: &InboundGroupSession) -> (&RoomId, &str) {
(&session.room_id(), &session.session_id())
}

View File

@@ -33,6 +33,7 @@ use vodozemac::Curve25519PublicKey;
use super::{
caches::DeviceStore, Account, BackupKeys, Changes, CryptoStore, DehydratedDeviceKey,
InboundGroupSession, PendingChanges, RoomKeyCounts, RoomSettings, Session,
StoredRoomKeyBundleData,
};
use crate::{
gossiping::{GossipRequest, GossippedSecret, SecretInfo},
@@ -71,7 +72,7 @@ impl BackupVersion {
}
/// An in-memory only store that will forget all the E2EE key once it's dropped.
#[derive(Debug)]
#[derive(Default, Debug)]
pub struct MemoryStore {
static_account: Arc<StdRwLock<Option<StaticAccountData>>>,
@@ -102,36 +103,10 @@ pub struct MemoryStore {
dehydrated_device_pickle_key: RwLock<Option<DehydratedDeviceKey>>,
next_batch_token: RwLock<Option<String>>,
room_settings: StdRwLock<HashMap<OwnedRoomId, RoomSettings>>,
save_changes_lock: Arc<Mutex<()>>,
}
room_key_bundles:
StdRwLock<HashMap<OwnedRoomId, HashMap<OwnedUserId, StoredRoomKeyBundleData>>>,
impl Default for MemoryStore {
fn default() -> Self {
MemoryStore {
static_account: Default::default(),
account: Default::default(),
sessions: Default::default(),
inbound_group_sessions: Default::default(),
inbound_group_sessions_backed_up_to: Default::default(),
outbound_group_sessions: Default::default(),
private_identity: Default::default(),
tracked_users: Default::default(),
olm_hashes: Default::default(),
devices: DeviceStore::new(),
identities: Default::default(),
outgoing_key_requests: Default::default(),
key_requests_by_info: Default::default(),
direct_withheld_info: Default::default(),
custom_values: Default::default(),
leases: Default::default(),
backup_keys: Default::default(),
dehydrated_device_pickle_key: Default::default(),
secret_inbox: Default::default(),
next_batch_token: Default::default(),
room_settings: Default::default(),
save_changes_lock: Default::default(),
}
}
save_changes_lock: Arc<Mutex<()>>,
}
impl MemoryStore {
@@ -348,6 +323,16 @@ impl CryptoStore for MemoryStore {
settings.extend(changes.room_settings);
}
if !changes.received_room_key_bundles.is_empty() {
let mut room_key_bundles = self.room_key_bundles.write();
for bundle in changes.received_room_key_bundles {
room_key_bundles
.entry(bundle.bundle_data.room_id.clone())
.or_default()
.insert(bundle.sender_user.clone(), bundle);
}
}
Ok(())
}
@@ -719,6 +704,18 @@ impl CryptoStore for MemoryStore {
Ok(self.room_settings.read().get(room_id).cloned())
}
async fn get_received_room_key_bundle_data(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<StoredRoomKeyBundleData>> {
let guard = self.room_key_bundles.read();
let result = guard.get(room_id).and_then(|bundles| bundles.get(user_id).cloned());
Ok(result)
}
async fn get_custom_value(&self, key: &str) -> Result<Option<Vec<u8>>> {
Ok(self.custom_values.read().get(key).cloned())
}
@@ -1249,7 +1246,7 @@ mod integration_tests {
},
store::{
BackupKeys, Changes, CryptoStore, DehydratedDeviceKey, PendingChanges, RoomKeyCounts,
RoomSettings,
RoomSettings, StoredRoomKeyBundleData,
},
types::events::room_key_withheld::RoomKeyWithheldEvent,
Account, DeviceData, GossipRequest, GossippedSecret, SecretInfo, Session, TrackedUser,
@@ -1511,6 +1508,14 @@ mod integration_tests {
self.0.get_room_settings(room_id).await
}
async fn get_received_room_key_bundle_data(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> crate::store::Result<Option<StoredRoomKeyBundleData>, Self::Error> {
self.0.get_received_room_key_bundle_data(room_id, user_id).await
}
async fn get_custom_value(&self, key: &str) -> Result<Option<Vec<u8>>, Self::Error> {
self.0.get_custom_value(key).await
}

View File

@@ -70,7 +70,7 @@ use crate::{
identities::{user::UserIdentity, Device, DeviceData, UserDevices, UserIdentityData},
olm::{
Account, ExportedRoomKey, InboundGroupSession, OlmMessageHash, OutboundGroupSession,
PrivateCrossSigningIdentity, Session, StaticAccountData,
PrivateCrossSigningIdentity, SenderData, Session, StaticAccountData,
},
types::{
events::room_key_withheld::RoomKeyWithheldEvent, BackupSecrets, CrossSigningSecrets,
@@ -101,7 +101,8 @@ pub use memorystore::MemoryStore;
pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
use crate::types::{
events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
events::{room_key_bundle::RoomKeyBundleContent, room_key_withheld::RoomKeyWithheldContent},
room_history::RoomKeyBundle,
};
pub use crate::{
dehydrated_devices::DehydrationError,
@@ -541,6 +542,26 @@ pub struct Changes {
pub room_settings: HashMap<OwnedRoomId, RoomSettings>,
pub secrets: Vec<GossippedSecret>,
pub next_batch_token: Option<String>,
/// Historical room key history bundles that we have received and should
/// store.
pub received_room_key_bundles: Vec<StoredRoomKeyBundleData>,
}
/// Information about an [MSC4268] room key bundle.
///
/// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StoredRoomKeyBundleData {
/// The user that sent us this data.
pub sender_user: OwnedUserId,
/// Information about the sender of this data and how much we trust that
/// information.
pub sender_data: SenderData,
/// The room key bundle data itself.
pub bundle_data: RoomKeyBundleContent,
}
/// A user for which we are tracking the list of devices.
@@ -573,6 +594,7 @@ impl Changes {
&& self.room_settings.is_empty()
&& self.secrets.is_empty()
&& self.next_batch_token.is_none()
&& self.received_room_key_bundles.is_empty()
}
}

View File

@@ -23,7 +23,7 @@ use vodozemac::Curve25519PublicKey;
use super::{
BackupKeys, Changes, CryptoStoreError, DehydratedDeviceKey, PendingChanges, Result,
RoomKeyCounts, RoomSettings,
RoomKeyCounts, RoomSettings, StoredRoomKeyBundleData,
};
#[cfg(doc)]
use crate::olm::SenderData;
@@ -323,6 +323,14 @@ pub trait CryptoStore: AsyncTraitDeps {
room_id: &RoomId,
) -> Result<Option<RoomSettings>, Self::Error>;
/// Get the details about the room key bundle data received from the given
/// user for the given room.
async fn get_received_room_key_bundle_data(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<StoredRoomKeyBundleData>, Self::Error>;
/// Get arbitrary data from the store
///
/// # Arguments
@@ -569,6 +577,14 @@ impl<T: CryptoStore> CryptoStore for EraseCryptoStoreError<T> {
self.0.get_room_settings(room_id).await.map_err(Into::into)
}
async fn get_received_room_key_bundle_data(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<StoredRoomKeyBundleData>> {
self.0.get_received_room_key_bundle_data(room_id, user_id).await.map_err(Into::into)
}
async fn get_custom_value(&self, key: &str) -> Result<Option<Vec<u8>>, Self::Error> {
self.0.get_custom_value(key).await.map_err(Into::into)
}

View File

@@ -30,7 +30,11 @@ use super::{
secret_send::SecretSendContent,
EventType,
};
use crate::types::{deserialize_ed25519_key, events::from_str, serialize_ed25519_key, DeviceKeys};
use crate::types::{
deserialize_ed25519_key,
events::{from_str, room_key_bundle::RoomKeyBundleContent},
serialize_ed25519_key, DeviceKeys,
};
/// An `m.dummy` event that was decrypted using the
/// `m.olm.v1.curve25519-aes-sha2` algorithm
@@ -77,6 +81,10 @@ impl DecryptedForwardedRoomKeyEvent {
/// `m.olm.v1.curve25519-aes-sha2` algorithm
pub type DecryptedSecretSendEvent = DecryptedOlmV1Event<SecretSendContent>;
/// An `io.element.msc4268.room_key_bundle` to-device event which has
/// been decrypted using using the `m.olm.v1.curve25519-aes-sha2` algorithm
pub type DecryptedRoomKeyBundleEvent = DecryptedOlmV1Event<RoomKeyBundleContent>;
/// An enum over the various events that were decrypted using the
/// `m.olm.v1.curve25519-aes-sha2` algorithm.
#[derive(Debug)]
@@ -89,6 +97,8 @@ pub enum AnyDecryptedOlmEvent {
SecretSend(DecryptedSecretSendEvent),
/// The `m.dummy` decrypted to-device event.
Dummy(DecryptedDummyEvent),
/// The `io.element.msc4268.room_key_bundle` decrypted to-device event.
RoomKeyBundle(DecryptedRoomKeyBundleEvent),
/// A decrypted to-device event of an unknown or custom type.
Custom(Box<ToDeviceCustomEvent>),
}
@@ -101,6 +111,7 @@ impl AnyDecryptedOlmEvent {
AnyDecryptedOlmEvent::ForwardedRoomKey(e) => &e.sender,
AnyDecryptedOlmEvent::SecretSend(e) => &e.sender,
AnyDecryptedOlmEvent::Custom(e) => &e.sender,
AnyDecryptedOlmEvent::RoomKeyBundle(e) => &e.sender,
AnyDecryptedOlmEvent::Dummy(e) => &e.sender,
}
}
@@ -112,6 +123,7 @@ impl AnyDecryptedOlmEvent {
AnyDecryptedOlmEvent::ForwardedRoomKey(e) => &e.recipient,
AnyDecryptedOlmEvent::SecretSend(e) => &e.recipient,
AnyDecryptedOlmEvent::Custom(e) => &e.recipient,
AnyDecryptedOlmEvent::RoomKeyBundle(e) => &e.recipient,
AnyDecryptedOlmEvent::Dummy(e) => &e.recipient,
}
}
@@ -123,6 +135,7 @@ impl AnyDecryptedOlmEvent {
AnyDecryptedOlmEvent::ForwardedRoomKey(e) => &e.keys,
AnyDecryptedOlmEvent::SecretSend(e) => &e.keys,
AnyDecryptedOlmEvent::Custom(e) => &e.keys,
AnyDecryptedOlmEvent::RoomKeyBundle(e) => &e.keys,
AnyDecryptedOlmEvent::Dummy(e) => &e.keys,
}
}
@@ -134,6 +147,7 @@ impl AnyDecryptedOlmEvent {
AnyDecryptedOlmEvent::ForwardedRoomKey(e) => &e.recipient_keys,
AnyDecryptedOlmEvent::SecretSend(e) => &e.recipient_keys,
AnyDecryptedOlmEvent::Custom(e) => &e.recipient_keys,
AnyDecryptedOlmEvent::RoomKeyBundle(e) => &e.recipient_keys,
AnyDecryptedOlmEvent::Dummy(e) => &e.recipient_keys,
}
}
@@ -145,6 +159,7 @@ impl AnyDecryptedOlmEvent {
AnyDecryptedOlmEvent::RoomKey(e) => e.content.event_type(),
AnyDecryptedOlmEvent::ForwardedRoomKey(e) => e.content.event_type(),
AnyDecryptedOlmEvent::SecretSend(e) => e.content.event_type(),
AnyDecryptedOlmEvent::RoomKeyBundle(e) => e.content.event_type(),
AnyDecryptedOlmEvent::Dummy(e) => e.content.event_type(),
}
}
@@ -156,6 +171,7 @@ impl AnyDecryptedOlmEvent {
AnyDecryptedOlmEvent::RoomKey(e) => e.sender_device_keys.as_ref(),
AnyDecryptedOlmEvent::ForwardedRoomKey(e) => e.sender_device_keys.as_ref(),
AnyDecryptedOlmEvent::SecretSend(e) => e.sender_device_keys.as_ref(),
AnyDecryptedOlmEvent::RoomKeyBundle(e) => e.sender_device_keys.as_ref(),
AnyDecryptedOlmEvent::Dummy(e) => e.sender_device_keys.as_ref(),
}
}
@@ -312,7 +328,9 @@ impl<'de> Deserialize<'de> for AnyDecryptedOlmEvent {
"m.forwarded_room_key" => AnyDecryptedOlmEvent::ForwardedRoomKey(from_str(json)?),
"m.secret.send" => AnyDecryptedOlmEvent::SecretSend(from_str(json)?),
"m.dummy" => AnyDecryptedOlmEvent::Dummy(from_str(json)?),
RoomKeyBundleContent::EVENT_TYPE => {
AnyDecryptedOlmEvent::RoomKeyBundle(from_str(json)?)
}
_ => AnyDecryptedOlmEvent::Custom(from_str(json)?),
})
}

View File

@@ -30,7 +30,7 @@ use matrix_sdk_crypto::{
},
store::{
BackupKeys, Changes, CryptoStore, CryptoStoreError, DehydratedDeviceKey, PendingChanges,
RoomKeyCounts, RoomSettings,
RoomKeyCounts, RoomSettings, StoredRoomKeyBundleData,
},
types::events::room_key_withheld::RoomKeyWithheldEvent,
vodozemac::base64_encode,
@@ -1359,6 +1359,12 @@ impl_crypto_store! {
.transpose()
}
#[allow(clippy::unused_async)]
async fn get_received_room_key_bundle_data(&self, _room_id: &RoomId, _user_id: &UserId) -> Result<Option<StoredRoomKeyBundleData>> {
// TODO: not yet implemented for indexeddb
Ok(None)
}
async fn get_custom_value(&self, key: &str) -> Result<Option<Vec<u8>>> {
self
.inner

View File

@@ -0,0 +1,9 @@
CREATE TABLE "received_room_key_bundle"
(
"room_id" BLOB NOT NULL,
"sender_user_id" BLOB NOT NULL,
"bundle_data" BLOB NOT NULL
);
CREATE UNIQUE INDEX "received_room_key_bundle_room_id_user_id_idx"
ON "received_room_key_bundle" ("room_id", "sender_user_id");

View File

@@ -29,7 +29,7 @@ use matrix_sdk_crypto::{
},
store::{
BackupKeys, Changes, CryptoStore, DehydratedDeviceKey, PendingChanges, RoomKeyCounts,
RoomSettings,
RoomSettings, StoredRoomKeyBundleData,
},
types::events::room_key_withheld::RoomKeyWithheldEvent,
Account, DeviceData, GossipRequest, GossippedSecret, SecretInfo, TrackedUser, UserIdentityData,
@@ -111,6 +111,7 @@ impl SqliteCryptoStore {
let conn = pool.get().await?;
let version = conn.db_version().await?;
debug!("Opened sqlite store with version {}", version);
run_migrations(&conn, version).await?;
let store_cipher = match passphrase {
@@ -207,7 +208,7 @@ impl SqliteCryptoStore {
}
}
const DATABASE_VERSION: u8 = 9;
const DATABASE_VERSION: u8 = 10;
/// key for the dehydrated device pickle key in the key/value table.
const DEHYDRATED_DEVICE_PICKLE_KEY: &str = "dehydrated_device_pickle_key";
@@ -303,6 +304,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
.await?;
}
if version < 10 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!(
"../migrations/crypto_store/010_received_room_key_bundles.sql"
))?;
txn.set_db_version(10)
})
.await?;
}
Ok(())
}
@@ -350,6 +361,13 @@ trait SqliteConnectionExt {
fn set_room_settings(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
fn set_secret(&self, request_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
fn set_received_room_key_bundle(
&self,
room_id: &[u8],
user_id: &[u8],
data: &[u8],
) -> rusqlite::Result<()>;
}
impl SqliteConnectionExt for rusqlite::Connection {
@@ -478,6 +496,21 @@ impl SqliteConnectionExt for rusqlite::Connection {
Ok(())
}
fn set_received_room_key_bundle(
&self,
room_id: &[u8],
sender_user_id: &[u8],
data: &[u8],
) -> rusqlite::Result<()> {
self.execute(
"INSERT INTO received_room_key_bundle(room_id, sender_user_id, bundle_data)
VALUES (?1, ?2, ?3)
ON CONFLICT (room_id, sender_user_id) DO UPDATE SET bundle_data = ?3",
(room_id, sender_user_id, data),
)?;
Ok(())
}
}
#[async_trait]
@@ -744,6 +777,21 @@ trait SqliteObjectCryptoStoreExt: SqliteAsyncConnExt {
.await
.optional()?)
}
async fn get_received_room_key_bundle(
&self,
room_id: Key,
sender_user: Key,
) -> Result<Option<Vec<u8>>> {
Ok(self
.query_row(
"SELECT bundle_data FROM received_room_key_bundle WHERE room_id = ? AND sender_user = ?",
(room_id, sender_user),
|row| { row.get(0) },
)
.await
.optional()?)
}
}
#[async_trait]
@@ -947,6 +995,14 @@ impl CryptoStore for SqliteCryptoStore {
txn.set_secret(&secret_name, &value)?;
}
for bundle in changes.received_room_key_bundles {
let room_id =
this.encode_key("received_room_key_bundle", &bundle.bundle_data.room_id);
let user_id = this.encode_key("received_room_key_bundle", &bundle.sender_user);
let value = this.serialize_value(&bundle)?;
txn.set_received_room_key_bundle(&room_id, &user_id, &value)?;
}
Ok::<_, Error>(())
})
.await?;
@@ -1337,6 +1393,21 @@ impl CryptoStore for SqliteCryptoStore {
return Ok(Some(settings));
}
async fn get_received_room_key_bundle_data(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<StoredRoomKeyBundleData>> {
let room_id = self.encode_key("received_room_key_bundle", room_id);
let user_id = self.encode_key("received_room_key_bundle", user_id);
self.acquire()
.await?
.get_received_room_key_bundle(room_id, user_id)
.await?
.map(|value| self.deserialize_value(&value))
.transpose()
}
async fn get_custom_value(&self, key: &str) -> Result<Option<Vec<u8>>> {
let Some(serialized) = self.acquire().await?.get_kv(key).await? else {
return Ok(None);