mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-14 02:55:47 -04:00
TERRIBLE HACK: save the pos value in the crypto store
This commit is contained in:
@@ -15,7 +15,10 @@ use super::{
|
||||
FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList,
|
||||
SlidingSyncPositionMarkers,
|
||||
};
|
||||
use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result};
|
||||
use crate::{
|
||||
sliding_sync::{FrozenSlidingSyncPos, SlidingSyncListCachePolicy},
|
||||
Client, Result,
|
||||
};
|
||||
|
||||
/// Be careful: as this is used as a storage key; changing it requires migrating
|
||||
/// data!
|
||||
@@ -57,9 +60,17 @@ async fn clean_storage(
|
||||
for list_name in lists.keys() {
|
||||
invalidate_cached_list(storage, storage_key, list_name).await;
|
||||
}
|
||||
let _ = storage
|
||||
.remove_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes())
|
||||
.await;
|
||||
let instance_storage_key = format_storage_key_for_sliding_sync(storage_key);
|
||||
let _ = storage.remove_custom_value(instance_storage_key.as_bytes()).await;
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
if let Some(olm_machine) = &*client.olm_machine().await {
|
||||
// Invalidate the value stored for the TERRIBLE HACK.
|
||||
let _ = olm_machine
|
||||
.store()
|
||||
.set_custom_value(&instance_storage_key, "".as_bytes().to_vec())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Store the `SlidingSync`'s state in the storage.
|
||||
@@ -82,6 +93,18 @@ pub(super) async fn store_sliding_sync_state(
|
||||
)
|
||||
.await?;
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
{
|
||||
// FIXME (TERRIBLE HACK): we want to save `pos` in a cross-process safe manner,
|
||||
// with both processes sharing the same database backend; that needs to
|
||||
// go in the crypto process store at the moment, but should be fixed
|
||||
// later on.
|
||||
if let Some(olm_machine) = &*sliding_sync.inner.client.olm_machine().await {
|
||||
let pos_blob = serde_json::to_vec(&FrozenSlidingSyncPos { pos: position.pos.clone() })?;
|
||||
olm_machine.store().set_custom_value(&instance_storage_key, pos_blob).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Write every `SlidingSyncList` that's configured for caching into the store.
|
||||
let frozen_lists = {
|
||||
let rooms_lock = sliding_sync.inner.rooms.read().await;
|
||||
@@ -190,19 +213,16 @@ pub(super) async fn restore_sliding_sync_state(
|
||||
}
|
||||
|
||||
let storage = client.store();
|
||||
let instance_storage_key = format_storage_key_for_sliding_sync(storage_key);
|
||||
|
||||
// Preload the `SlidingSync` object from the cache.
|
||||
match storage
|
||||
.get_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes())
|
||||
.get_custom_value(instance_storage_key.as_bytes())
|
||||
.await?
|
||||
.map(|custom_value| serde_json::from_slice::<FrozenSlidingSync>(&custom_value))
|
||||
{
|
||||
// `SlidingSync` has been found and successfully deserialized.
|
||||
Some(Ok(FrozenSlidingSync {
|
||||
to_device_since,
|
||||
delta_token: frozen_delta_token,
|
||||
pos: frozen_pos,
|
||||
})) => {
|
||||
Some(Ok(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token })) => {
|
||||
trace!("Successfully read the `SlidingSync` from the cache");
|
||||
// Only update the to-device token if we failed to read it from the crypto store
|
||||
// above.
|
||||
@@ -211,7 +231,22 @@ pub(super) async fn restore_sliding_sync_state(
|
||||
}
|
||||
|
||||
restored_fields.delta_token = frozen_delta_token;
|
||||
restored_fields.pos = frozen_pos;
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
{
|
||||
if let Some(olm_machine) = &*client.olm_machine().await {
|
||||
if let Ok(Some(blob)) =
|
||||
olm_machine.store().get_custom_value(&instance_storage_key).await
|
||||
{
|
||||
if let Ok(frozen_pos) =
|
||||
serde_json::from_slice::<FrozenSlidingSyncPos>(&blob)
|
||||
{
|
||||
trace!("Successfully read the `Sliding Sync` pos from the crypto store cache");
|
||||
restored_fields.pos = frozen_pos.pos;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// `SlidingSync` has been found, but it wasn't possible to deserialize it. It's
|
||||
@@ -445,7 +480,6 @@ mod tests {
|
||||
Some(bytes) => {
|
||||
let deserialized: FrozenSlidingSync = serde_json::from_slice(&bytes)?;
|
||||
assert_eq!(deserialized.delta_token, Some(delta_token.clone()));
|
||||
assert_eq!(deserialized.pos, Some(pos.clone()));
|
||||
assert!(deserialized.to_device_since.is_none());
|
||||
}
|
||||
);
|
||||
@@ -481,7 +515,6 @@ mod tests {
|
||||
serde_json::to_vec(&FrozenSlidingSync {
|
||||
to_device_since: Some(to_device_token.clone()),
|
||||
delta_token: Some(delta_token.clone()),
|
||||
pos: Some(pos.clone()),
|
||||
})?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -877,21 +877,21 @@ struct FrozenSlidingSync {
|
||||
to_device_since: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
delta_token: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pos: Option<String>,
|
||||
}
|
||||
|
||||
impl FrozenSlidingSync {
|
||||
async fn new(position: &SlidingSyncPositionMarkers) -> Self {
|
||||
// The to-device token must be saved in the `FrozenCryptoSlidingSync` now.
|
||||
Self {
|
||||
delta_token: position.delta_token.clone(),
|
||||
to_device_since: None,
|
||||
pos: position.pos.clone(),
|
||||
}
|
||||
Self { delta_token: position.delta_token.clone(), to_device_since: None }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct FrozenSlidingSyncPos {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pos: Option<String>,
|
||||
}
|
||||
|
||||
/// A summary of the updates received after a sync (like in
|
||||
/// [`SlidingSync::sync`]).
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -1613,6 +1613,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
#[async_test]
|
||||
async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
|
||||
let server = MockServer::start().await;
|
||||
@@ -1707,6 +1708,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
#[async_test]
|
||||
async fn test_sliding_sync_does_remember_pos() -> Result<()> {
|
||||
let server = MockServer::start().await;
|
||||
|
||||
Reference in New Issue
Block a user