mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-06 06:53:32 -04:00
fix(MemoryStore): undeadlock timeline saving
This commit is contained in:
@@ -36,7 +36,7 @@ indexeddb_cryptostore = ["matrix-sdk-crypto/indexeddb_cryptostore"]
|
||||
[dependencies]
|
||||
async-stream = "0.3.2"
|
||||
chacha20poly1305 = { version = "0.9.0", optional = true }
|
||||
dashmap = "4.0.2"
|
||||
dashmap = "5.1.0"
|
||||
futures-core = "0.3.15"
|
||||
futures-util = { version = "0.3.15", default-features = false }
|
||||
futures-channel = "0.3.15"
|
||||
|
||||
@@ -284,10 +284,10 @@ impl MemoryStore {
|
||||
info!("Save new timeline batch from messages response for {}", room);
|
||||
}
|
||||
|
||||
let data = if timeline.limited {
|
||||
let mut delete_timeline = false;
|
||||
if timeline.limited {
|
||||
info!("Delete stored timeline for {} because the sync response was limited", room);
|
||||
self.room_timeline.remove(room);
|
||||
None
|
||||
delete_timeline = true
|
||||
} else if let Some(mut data) = self.room_timeline.get_mut(room) {
|
||||
if !timeline.sync && Some(&timeline.start) != data.end.as_ref() {
|
||||
// This should only happen when a developer adds a wrong timeline
|
||||
@@ -298,7 +298,6 @@ impl MemoryStore {
|
||||
}
|
||||
|
||||
// Check if the event already exists in the store
|
||||
let mut delete_timeline = false;
|
||||
for event in &timeline.events {
|
||||
if let Some(event_id) = event.event_id() {
|
||||
if data.event_id_to_position.contains_key(&event_id) {
|
||||
@@ -308,36 +307,39 @@ impl MemoryStore {
|
||||
}
|
||||
}
|
||||
|
||||
if delete_timeline {
|
||||
info!("Delete stored timeline for {} because of duplicated events", room);
|
||||
self.room_timeline.remove(room);
|
||||
None
|
||||
} else if timeline.sync {
|
||||
data.start = timeline.start.clone();
|
||||
Some(data)
|
||||
} else {
|
||||
data.end = timeline.end.clone();
|
||||
Some(data)
|
||||
if !delete_timeline {
|
||||
if timeline.sync {
|
||||
data.start = timeline.start.clone();
|
||||
} else {
|
||||
data.end = timeline.end.clone();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
let mut data = &mut *if let Some(data) = data {
|
||||
data
|
||||
} else {
|
||||
let data = TimelineData {
|
||||
if delete_timeline {
|
||||
info!("Delete stored timeline for {} because of duplicated events", room);
|
||||
self.room_timeline.remove(room);
|
||||
}
|
||||
|
||||
let mut data = self.room_timeline.entry(room.to_owned()).or_insert_with(||
|
||||
TimelineData {
|
||||
start: timeline.start.clone(),
|
||||
end: timeline.end.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
self.room_timeline.insert(room.to_owned(), data);
|
||||
self.room_timeline.get_mut(room).unwrap()
|
||||
};
|
||||
|
||||
// Create a copy of the events if the stream created via `room_timeline()` isn't
|
||||
// fully consumed
|
||||
let data_events = Arc::make_mut(&mut data.events);
|
||||
}
|
||||
);
|
||||
|
||||
let make_room_version = || self.room_info
|
||||
.get(room)
|
||||
.and_then(|info| {
|
||||
info.base_info
|
||||
.create
|
||||
.as_ref()
|
||||
.map(|event| event.room_version.clone())
|
||||
}).unwrap_or_else(|| {
|
||||
warn!("Unable to find the room version for {}, assume version 9", room);
|
||||
RoomVersionId::V9
|
||||
});
|
||||
|
||||
if timeline.sync {
|
||||
let mut room_version = None;
|
||||
@@ -347,21 +349,16 @@ impl MemoryStore {
|
||||
redaction,
|
||||
))) = event.event.deserialize()
|
||||
{
|
||||
if let Some(position) = data.event_id_to_position.get(&redaction.redacts) {
|
||||
if let Some(mut full_event) = data_events.get_mut(position) {
|
||||
let pos = match data.event_id_to_position.get(&redaction.redacts) {
|
||||
Some(pos) => Some(pos.clone()),
|
||||
None => None,
|
||||
};
|
||||
|
||||
if let Some(position) = pos{
|
||||
if let Some(mut full_event) = data.events.get_mut(&position.clone()) {
|
||||
let inner_event = full_event.event.deserialize()?;
|
||||
if room_version.is_none() {
|
||||
room_version = Some(self.room_info
|
||||
.get(room)
|
||||
.and_then(|info| {
|
||||
info.base_info
|
||||
.create
|
||||
.as_ref()
|
||||
.map(|event| event.room_version.clone())
|
||||
}).unwrap_or_else(|| {
|
||||
warn!("Unable to find the room version for {}, assume version 9", room);
|
||||
RoomVersionId::V9
|
||||
}));
|
||||
room_version = Some(make_room_version());
|
||||
}
|
||||
|
||||
full_event.event = Raw::new(&AnySyncRoomEvent::from(
|
||||
@@ -372,20 +369,22 @@ impl MemoryStore {
|
||||
}
|
||||
|
||||
data.start_position -= 1;
|
||||
let start_position = data.start_position.clone();
|
||||
// Only add event with id to the position map
|
||||
if let Some(event_id) = event.event_id() {
|
||||
data.event_id_to_position.insert(event_id, data.start_position);
|
||||
data.event_id_to_position.insert(event_id, start_position);
|
||||
}
|
||||
data_events.insert(data.start_position, event.to_owned());
|
||||
data.events.insert(start_position, event.to_owned());
|
||||
}
|
||||
} else {
|
||||
for event in timeline.events.iter() {
|
||||
data.end_position += 1;
|
||||
let end_position = data.end_position.clone();
|
||||
// Only add event with id to the position map
|
||||
if let Some(event_id) = event.event_id() {
|
||||
data.event_id_to_position.insert(event_id, data.end_position);
|
||||
data.event_id_to_position.insert(event_id, end_position);
|
||||
}
|
||||
data_events.insert(data.end_position, event.to_owned());
|
||||
data.events.insert(end_position, event.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -762,7 +761,7 @@ struct TimelineData {
|
||||
pub start_position: isize,
|
||||
pub end: Option<String>,
|
||||
pub end_position: isize,
|
||||
pub events: Arc<BTreeMap<isize, SyncRoomEvent>>,
|
||||
pub events: BTreeMap<isize, SyncRoomEvent>,
|
||||
pub event_id_to_position: HashMap<Box<EventId>, isize>,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user