Merge pull request #3320 from matrix-org/andybalaam/fast-backup-reset-in-memorystore2

crypto: MemoryStore uses backup versions to track which sessions are backed up
This commit is contained in:
Andy Balaam
2024-05-21 09:57:26 +01:00
committed by GitHub
4 changed files with 443 additions and 38 deletions

View File

@@ -29,6 +29,9 @@ Breaking changes:
Additions:
- Expose new method `BackupMachine::backup_version`.
([#3320](https://github.com/matrix-org/matrix-rust-sdk/pull/3320))
- Add data types to parse the QR code data for the QR code login defined in
[MSC4108](https://github.com/matrix-org/matrix-spec-proposals/pull/4108)

View File

@@ -419,6 +419,13 @@ impl BackupMachine {
Ok(())
}
/// Provide the `backup_version` of the current `backup_key`, or None if
/// there is no current key, or the key is not used with any backup
/// version.
pub async fn backup_version(&self) -> Option<String> {
self.backup_key.read().await.as_ref().and_then(|k| k.backup_version())
}
/// Store the backup decryption key in the crypto store.
///
/// This is useful if the client wants to support gossiping of the backup

View File

@@ -375,38 +375,42 @@ macro_rules! cryptostore_integration_tests {
#[async_test]
async fn reset_inbound_group_session_for_backup() {
// Given a store exists where all sessions are backed up to backup_1
let (account, store) =
get_loaded_store("reset_inbound_group_session_for_backup").await;
assert_eq!(store.inbound_group_session_counts(None).await.unwrap().total, 0);
let room_id = &room_id!("!test:localhost");
let (_, session) = account.create_group_session_pair_with_defaults(room_id).await;
let changes =
Changes { inbound_group_sessions: vec![session.clone()], ..Default::default() };
let mut sessions: Vec<InboundGroupSession> = Vec::with_capacity(10);
for _ in 0..10 {
sessions.push(account.create_group_session_pair_with_defaults(room_id).await.1);
}
let changes = Changes { inbound_group_sessions: sessions.clone(), ..Default::default() };
store.save_changes(changes).await.expect("Can't save group session");
assert_eq!(store.inbound_group_sessions_for_backup("backup_1", 100).await.unwrap().len(), 10);
store.mark_inbound_group_sessions_as_backed_up(
"backup_1",
&(0..10).map(|i| session_info(&sessions[i])).collect::<Vec<_>>(),
).await.expect("Failed to mark sessions as backed up");
// Given we have backed up our session
store
.mark_inbound_group_sessions_as_backed_up("bkpver1", &[session_info(&session)])
.await
.expect("Failed to mark_inbound_group_sessions_as_backed_up.");
// Sanity: none need backing up to the same backup
{
let to_back_up_old = store.inbound_group_sessions_for_backup("backup_1", 10).await.unwrap();
assert_eq!(to_back_up_old.len(), 0);
}
assert_eq!(store.inbound_group_session_counts(Some("bkpver1")).await.unwrap().total, 1);
assert_eq!(store.inbound_group_session_counts(Some("bkpver1")).await.unwrap().backed_up, 1);
// Some stores ignore backup_version and just reset when you tell them to. Tell
// them here.
store.reset_backup_state().await.expect("reset failed");
// Sanity: before resetting, we have nothing to back up
let to_back_up = store.inbound_group_sessions_for_backup("bkpver1", 1).await.unwrap();
assert_eq!(to_back_up, vec![]);
// When we ask what needs backing up to a different backup version
let to_back_up = store.inbound_group_sessions_for_backup("backup_02", 10).await.unwrap();
// When we reset the backup
store.reset_backup_state().await.unwrap();
// Then after resetting, even if we supply the same backup version number, we need
// to back up the session
let to_back_up = store.inbound_group_sessions_for_backup("bkpver1", 1).await.unwrap();
assert_eq!(to_back_up, vec![session]);
// Then the answer is everything
let needs_backing_up = |i: usize| to_back_up.iter().any(|s| s.session_id() == sessions[i].session_id());
assert!(needs_backing_up(0));
assert!(needs_backing_up(1));
assert!(needs_backing_up(8));
assert!(needs_backing_up(9));
assert_eq!(to_back_up.len(), 10);
}
#[async_test]

View File

@@ -48,12 +48,35 @@ fn encode_key_info(info: &SecretInfo) -> String {
}
}
type SessionId = String;
/// The "version" of a backup - newtype wrapper around a String.
#[derive(Clone, Debug, PartialEq)]
struct BackupVersion(String);
impl BackupVersion {
fn from(s: &str) -> Self {
Self(s.to_owned())
}
fn as_str(&self) -> &str {
&self.0
}
}
/// An in-memory only store that will forget all the E2EE key once it's dropped.
#[derive(Debug)]
pub struct MemoryStore {
account: StdRwLock<Option<Account>>,
sessions: SessionStore,
inbound_group_sessions: GroupSessionStore,
/// Map room id -> session id -> backup order number
/// The latest backup in which this session is stored. Equivalent to
/// `backed_up_to` in [`IndexedDbCryptoStore`]
inbound_group_sessions_backed_up_to:
StdRwLock<HashMap<OwnedRoomId, HashMap<SessionId, BackupVersion>>>,
outbound_group_sessions: StdRwLock<BTreeMap<OwnedRoomId, OutboundGroupSession>>,
private_identity: StdRwLock<Option<PrivateCrossSigningIdentity>>,
tracked_users: StdRwLock<HashMap<OwnedUserId, TrackedUser>>,
@@ -77,6 +100,7 @@ impl Default for MemoryStore {
account: Default::default(),
sessions: SessionStore::new(),
inbound_group_sessions: GroupSessionStore::new(),
inbound_group_sessions_backed_up_to: Default::default(),
outbound_group_sessions: Default::default(),
private_identity: Default::default(),
tracked_users: Default::default(),
@@ -136,6 +160,32 @@ impl MemoryStore {
fn save_private_identity(&self, private_identity: Option<PrivateCrossSigningIdentity>) {
*self.private_identity.write().unwrap() = private_identity;
}
/// Return all the [`InboundGroupSession`]s we have, paired with the
/// `backed_up_to` value for each one (or "" where it is missing, which
/// should never happen).
async fn get_inbound_group_sessions_and_backed_up_to(
&self,
) -> Result<Vec<(InboundGroupSession, Option<BackupVersion>)>> {
let lookup = |s: &InboundGroupSession| {
self.inbound_group_sessions_backed_up_to
.read()
.unwrap()
.get(&s.room_id)?
.get(s.session_id())
.cloned()
};
Ok(self
.get_inbound_group_sessions()
.await?
.into_iter()
.map(|s| {
let v = lookup(&s);
(s, v)
})
.collect())
}
}
type Result<T> = std::result::Result<T, Infallible>;
@@ -280,47 +330,80 @@ impl CryptoStore for MemoryStore {
async fn inbound_group_session_counts(
&self,
_backup_version: Option<&str>,
backup_version: Option<&str>,
) -> Result<RoomKeyCounts> {
let backed_up =
self.get_inbound_group_sessions().await?.into_iter().filter(|s| s.backed_up()).count();
let backed_up = if let Some(backup_version) = backup_version {
self.get_inbound_group_sessions_and_backed_up_to()
.await?
.into_iter()
// Count the sessions backed up in the required backup
.filter(|(_, o)| o.as_ref().is_some_and(|o| o.as_str() == backup_version))
.count()
} else {
// We asked about a nonexistent backup version - this doesn't make much sense,
// but we can easily answer that nothing is backed up in this
// nonexistent backup.
0
};
Ok(RoomKeyCounts { total: self.inbound_group_sessions.count(), backed_up })
}
async fn inbound_group_sessions_for_backup(
&self,
_backup_version: &str,
backup_version: &str,
limit: usize,
) -> Result<Vec<InboundGroupSession>> {
Ok(self
.get_inbound_group_sessions()
.get_inbound_group_sessions_and_backed_up_to()
.await?
.into_iter()
.filter(|s| !s.backed_up())
.filter_map(|(session, backed_up_to)| {
if let Some(ref existing_version) = backed_up_to {
if existing_version.as_str() == backup_version {
// This session is already backed up in the required backup
return None;
}
}
// It's not backed up, or it's backed up in a different backup
Some(session)
})
.take(limit)
.collect())
}
async fn mark_inbound_group_sessions_as_backed_up(
&self,
_backup_version: &str,
backup_version: &str,
room_and_session_ids: &[(&RoomId, &str)],
) -> Result<()> {
for (room_id, session_id) in room_and_session_ids {
let mut inbound_group_sessions_backed_up_to =
self.inbound_group_sessions_backed_up_to.write().unwrap();
for &(room_id, session_id) in room_and_session_ids {
let session = self.inbound_group_sessions.get(room_id, session_id);
if let Some(session) = session {
session.mark_as_backed_up();
inbound_group_sessions_backed_up_to
.entry(room_id.to_owned())
.or_default()
.insert(session_id.to_owned(), BackupVersion::from(backup_version));
self.inbound_group_sessions.add(session);
}
}
Ok(())
}
async fn reset_backup_state(&self) -> Result<()> {
for session in self.get_inbound_group_sessions().await? {
session.reset_backup_state();
}
// Nothing to do here, because we remember which backup versions we backed up to
// in `mark_inbound_group_sessions_as_backed_up`, so we don't need to
// reset anything here because the required version is passed in to
// `inbound_group_sessions_for_backup`, and we can compare against the
// version we stored.
Ok(())
}
@@ -497,15 +580,18 @@ impl CryptoStore for MemoryStore {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use matrix_sdk_test::async_test;
use ruma::{room_id, user_id};
use ruma::{room_id, user_id, RoomId};
use vodozemac::{Curve25519PublicKey, Ed25519PublicKey};
use super::SessionId;
use crate::{
identities::device::testing::get_device,
olm::{
tests::get_account_and_session_test_helper, InboundGroupSession, OlmMessageHash,
PrivateCrossSigningIdentity,
tests::get_account_and_session_test_helper, Account, InboundGroupSession,
OlmMessageHash, PrivateCrossSigningIdentity,
},
store::{memorystore::MemoryStore, Changes, CryptoStore, PendingChanges},
};
@@ -553,6 +639,169 @@ mod tests {
assert_eq!(inbound, loaded_session);
}
#[async_test]
async fn test_backing_up_marks_sessions_as_backed_up() {
// Given there are 2 sessions
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(2, room_id).await;
// When I mark them as backed up
mark_backed_up(&store, room_id, "bkp1", &sessions).await;
// Then their backed_up_to field is set
let but = backed_up_tos(&store).await;
assert_eq!(but[sessions[0].session_id()], "bkp1");
assert_eq!(but[sessions[1].session_id()], "bkp1");
}
#[async_test]
async fn test_backing_up_a_second_set_of_sessions_updates_their_backup_order() {
// Given there are 3 sessions
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(3, room_id).await;
// When I mark 0 and 1 as backed up in bkp1
mark_backed_up(&store, room_id, "bkp1", &sessions[..2]).await;
// And 1 and 2 as backed up in bkp2
mark_backed_up(&store, room_id, "bkp2", &sessions[1..]).await;
// Then 0 is backed up in bkp1 and the 1 and 2 are backed up in bkp2
let but = backed_up_tos(&store).await;
assert_eq!(but[sessions[0].session_id()], "bkp1");
assert_eq!(but[sessions[1].session_id()], "bkp2");
assert_eq!(but[sessions[2].session_id()], "bkp2");
}
#[async_test]
async fn test_backing_up_again_to_the_same_version_has_no_effect() {
// Given there are 3 sessions
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(3, room_id).await;
// When I mark the first two as backed up in the first backup
mark_backed_up(&store, room_id, "bkp1", &sessions[..2]).await;
// And the last 2 as backed up in the same backup version
mark_backed_up(&store, room_id, "bkp1", &sessions[1..]).await;
// Then they all get the same backed_up_to value
let but = backed_up_tos(&store).await;
assert_eq!(but[sessions[0].session_id()], "bkp1");
assert_eq!(but[sessions[1].session_id()], "bkp1");
assert_eq!(but[sessions[2].session_id()], "bkp1");
}
#[async_test]
async fn test_backing_up_to_an_old_backup_version_can_increase_backed_up_to() {
// Given we have backed up some sessions to 2 backup versions, an older and a
// newer
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(4, room_id).await;
mark_backed_up(&store, room_id, "older_bkp", &sessions[..2]).await;
mark_backed_up(&store, room_id, "newer_bkp", &sessions[1..2]).await;
// When I ask to back up the un-backed-up ones to the older backup
mark_backed_up(&store, room_id, "older_bkp", &sessions[2..]).await;
// Then each session lists the backup it was most recently included in
let but = backed_up_tos(&store).await;
assert_eq!(but[sessions[0].session_id()], "older_bkp");
assert_eq!(but[sessions[1].session_id()], "newer_bkp");
assert_eq!(but[sessions[2].session_id()], "older_bkp");
assert_eq!(but[sessions[3].session_id()], "older_bkp");
}
#[async_test]
async fn test_backing_up_to_an_old_backup_version_overwrites_a_newer_one() {
// Given we have backed up to 2 backup versions, an older and a newer
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(4, room_id).await;
mark_backed_up(&store, room_id, "older_bkp", &sessions).await;
// Sanity: they are backed up in order number 1
assert_eq!(backed_up_tos(&store).await[sessions[0].session_id()], "older_bkp");
mark_backed_up(&store, room_id, "newer_bkp", &sessions).await;
// Sanity: they are backed up in order number 2
assert_eq!(backed_up_tos(&store).await[sessions[0].session_id()], "newer_bkp");
// When I ask to back up some to the older version
mark_backed_up(&store, room_id, "older_bkp", &sessions[..2]).await;
// Then older backup overwrites: we don't consider the order here at all
let but = backed_up_tos(&store).await;
assert_eq!(but[sessions[0].session_id()], "older_bkp");
assert_eq!(but[sessions[1].session_id()], "older_bkp");
assert_eq!(but[sessions[2].session_id()], "newer_bkp");
assert_eq!(but[sessions[3].session_id()], "newer_bkp");
}
#[async_test]
async fn test_not_backed_up_sessions_are_eligible_for_backup() {
// Given there are 4 sessions, 2 of which are already backed up
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(4, room_id).await;
mark_backed_up(&store, room_id, "bkp1", &sessions[..2]).await;
// When I ask which to back up
let mut to_backup = store
.inbound_group_sessions_for_backup("bkp1", 10)
.await
.expect("Failed to ask for sessions to backup");
to_backup.sort_by_key(|s| s.session_id().to_owned());
// Then I am told the last 2 only
assert_eq!(to_backup, &[sessions[2].clone(), sessions[3].clone()]);
}
#[async_test]
async fn test_all_sessions_are_eligible_for_backup_if_version_is_unknown() {
// Given there are 4 sessions, 2 of which are already backed up in bkp1
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(4, room_id).await;
mark_backed_up(&store, room_id, "bkp1", &sessions[..2]).await;
// When I ask which to back up in an unknown version
let mut to_backup = store
.inbound_group_sessions_for_backup("unknown_bkp", 10)
.await
.expect("Failed to ask for sessions to backup");
to_backup.sort_by_key(|s| s.session_id().to_owned());
// Then I am told to back up all of them
assert_eq!(
to_backup,
&[sessions[0].clone(), sessions[1].clone(), sessions[2].clone(), sessions[3].clone()]
);
}
#[async_test]
async fn test_sessions_backed_up_to_a_later_version_are_eligible_for_backup() {
// Given there are 4 sessions, some backed up to three different versions
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(4, room_id).await;
mark_backed_up(&store, room_id, "bkp0", &sessions[..1]).await;
mark_backed_up(&store, room_id, "bkp1", &sessions[1..2]).await;
mark_backed_up(&store, room_id, "bkp2", &sessions[2..3]).await;
// When I ask which to back up in the middle version
let mut to_backup = store
.inbound_group_sessions_for_backup("bkp1", 10)
.await
.expect("Failed to ask for sessions to backup");
to_backup.sort_by_key(|s| s.session_id().to_owned());
// Then I am told to back up everything not in the version I asked about
assert_eq!(
to_backup,
&[
sessions[0].clone(), // Backed up in bkp0
// sessions[1] is backed up in bkp1 already, which we asked about
sessions[2].clone(), // Backed up in bkp2
sessions[3].clone(), // Not backed up
]
);
}
#[async_test]
async fn test_outbound_group_session_store() {
// Given an outbound session
@@ -653,6 +902,148 @@ mod tests {
store.save_changes(changes).await.unwrap();
assert!(store.is_message_known(&hash).await.unwrap());
}
#[async_test]
async fn test_key_counts_of_empty_store_are_zero() {
// Given an empty store
let store = MemoryStore::new();
// When we count keys
let key_counts = store.inbound_group_session_counts(Some("")).await.unwrap();
// Then the answer is zero
assert_eq!(key_counts.total, 0);
assert_eq!(key_counts.backed_up, 0);
}
#[async_test]
async fn test_counting_sessions_reports_the_number_of_sessions() {
// Given a store with sessions
let room_id = room_id!("!test:localhost");
let (store, _) = store_with_sessions(4, room_id).await;
// When we count keys
let key_counts = store.inbound_group_session_counts(Some("bkp")).await.unwrap();
// Then the answer equals the number of sessions we created
assert_eq!(key_counts.total, 4);
// And none are backed up
assert_eq!(key_counts.backed_up, 0);
}
#[async_test]
async fn test_counting_backed_up_sessions_reports_the_number_backed_up_in_this_backup() {
// Given a store with sessions, some backed up
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(5, room_id).await;
mark_backed_up(&store, room_id, "bkp", &sessions[..2]).await;
// When we count keys
let key_counts = store.inbound_group_session_counts(Some("bkp")).await.unwrap();
// Then the answer equals the number of sessions we created
assert_eq!(key_counts.total, 5);
// And the backed_up count matches how many were backed up
assert_eq!(key_counts.backed_up, 2);
}
#[async_test]
async fn test_counting_backed_up_sessions_for_null_backup_reports_zero() {
// Given a store with sessions, some backed up
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(4, room_id).await;
mark_backed_up(&store, room_id, "bkp", &sessions[..2]).await;
// When we count keys, providing None as the backup version
let key_counts = store.inbound_group_session_counts(None).await.unwrap();
// Then we ignore everything and just say zero
assert_eq!(key_counts.backed_up, 0);
}
#[async_test]
async fn test_counting_backed_up_sessions_only_reports_sessions_in_the_version_specified() {
// Given a store with sessions, backed up in several versions
let room_id = room_id!("!test:localhost");
let (store, sessions) = store_with_sessions(4, room_id).await;
mark_backed_up(&store, room_id, "bkp1", &sessions[..2]).await;
mark_backed_up(&store, room_id, "bkp2", &sessions[3..]).await;
// When we count keys for bkp2
let key_counts = store.inbound_group_session_counts(Some("bkp2")).await.unwrap();
// Then the backed_up count reflects how many were backed up in bkp2 only
assert_eq!(key_counts.backed_up, 1);
}
/// Mark the supplied sessions as backed up in the supplied backup version
async fn mark_backed_up(
store: &MemoryStore,
room_id: &RoomId,
backup_version: &str,
sessions: &[InboundGroupSession],
) {
let rooms_and_ids: Vec<_> = sessions.iter().map(|s| (room_id, s.session_id())).collect();
store
.mark_inbound_group_sessions_as_backed_up(backup_version, &rooms_and_ids)
.await
.expect("Failed to mark sessions as backed up");
}
// Create a MemoryStore containing the supplied number of sessions.
//
// Sessions are returned in alphabetical order of session id.
async fn store_with_sessions(
num_sessions: usize,
room_id: &RoomId,
) -> (MemoryStore, Vec<InboundGroupSession>) {
let (account, _) = get_account_and_session_test_helper();
let mut sessions = Vec::with_capacity(num_sessions);
for _ in 0..num_sessions {
sessions.push(new_session(&account, room_id).await);
}
sessions.sort_by_key(|s| s.session_id().to_owned());
let store = MemoryStore::new();
store.save_inbound_group_sessions(sessions.clone());
(store, sessions)
}
// Create a new InboundGroupSession
async fn new_session(account: &Account, room_id: &RoomId) -> InboundGroupSession {
let curve_key = "Nn0L2hkcCMFKqynTjyGsJbth7QrVmX3lbrksMkrGOAw";
let (outbound, _) = account.create_group_session_pair_with_defaults(room_id).await;
InboundGroupSession::new(
Curve25519PublicKey::from_base64(curve_key).unwrap(),
Ed25519PublicKey::from_base64("ee3Ek+J2LkkPmjGPGLhMxiKnhiX//xcqaVL4RP6EypE").unwrap(),
room_id,
&outbound.session_key().await,
outbound.settings().algorithm.to_owned(),
None,
)
.unwrap()
}
/// Find the session_id and backed_up_to value for each of the sessions in
/// the store.
async fn backed_up_tos(store: &MemoryStore) -> HashMap<SessionId, String> {
store
.get_inbound_group_sessions_and_backed_up_to()
.await
.expect("Unable to get inbound group sessions and backup order")
.iter()
.map(|(s, o)| {
(
s.session_id().to_owned(),
o.as_ref().map(|v| v.as_str().to_owned()).unwrap_or("".to_owned()),
)
})
.collect()
}
}
#[cfg(test)]