mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-07 15:33:45 -04:00
fix(bindings): SlidingSync.sync returns an immediately cancellable TaskHandle
fix(bindings): `SlidingSync.sync` returns an immediately cancellable `TaskHandle`
This commit is contained in:
@@ -1,7 +1,4 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, RwLock,
|
||||
};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use anyhow::Context;
|
||||
use eyeball::Observable;
|
||||
@@ -20,7 +17,7 @@ pub use matrix_sdk::{
|
||||
SlidingSyncBuilder as MatrixSlidingSyncBuilder, SlidingSyncMode, SlidingSyncState,
|
||||
};
|
||||
use tokio::{sync::broadcast::error::RecvError, task::JoinHandle};
|
||||
use tracing::{debug, error, trace, warn};
|
||||
use tracing::{debug, error, warn};
|
||||
use url::Url;
|
||||
|
||||
use super::{Client, Room, RUNTIME};
|
||||
@@ -41,6 +38,7 @@ impl TaskHandle {
|
||||
Self { handle: Some(handle), callback: Default::default() }
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn with_callback(callback: TaskHandleCallback) -> Self {
|
||||
Self { handle: Default::default(), callback: RwLock::new(Some(callback)) }
|
||||
}
|
||||
@@ -722,43 +720,35 @@ impl SlidingSync {
|
||||
let inner = self.inner.clone();
|
||||
let client = self.client.clone();
|
||||
let observer = self.observer.clone();
|
||||
let stop_loop = Arc::new(AtomicBool::new(false));
|
||||
let remote_stopper = stop_loop.clone();
|
||||
|
||||
let stoppable = Arc::new(TaskHandle::with_callback(Box::new(move || {
|
||||
remote_stopper.store(true, Ordering::Relaxed);
|
||||
})));
|
||||
|
||||
RUNTIME.spawn(async move {
|
||||
Arc::new(TaskHandle::with_handle(RUNTIME.spawn(async move {
|
||||
let stream = inner.stream();
|
||||
pin_mut!(stream);
|
||||
|
||||
loop {
|
||||
let update = match stream.next().await {
|
||||
Some(Ok(u)) => u,
|
||||
Some(Err(e)) => {
|
||||
if client.process_sync_error(e) == LoopCtrl::Break {
|
||||
let update_summary = match stream.next().await {
|
||||
Some(Ok(update_summary)) => update_summary,
|
||||
|
||||
Some(Err(error)) => {
|
||||
if client.process_sync_error(error) == LoopCtrl::Break {
|
||||
warn!("loop was stopped by client error processing");
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
None => {
|
||||
warn!("Inner streaming loop ended unexpectedly");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(ref observer) = *observer.read().unwrap() {
|
||||
observer.did_receive_sync_update(update.into());
|
||||
}
|
||||
if stop_loop.load(Ordering::Relaxed) {
|
||||
trace!("stopped sync loop after cancellation");
|
||||
break;
|
||||
observer.did_receive_sync_update(update_summary.into());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
stoppable
|
||||
})))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
fmt::Debug,
|
||||
sync::{Arc, Mutex, RwLock as StdRwLock},
|
||||
sync::{Mutex, RwLock as StdRwLock},
|
||||
};
|
||||
|
||||
use eyeball::Observable;
|
||||
@@ -16,8 +16,8 @@ use tracing::trace;
|
||||
use url::Url;
|
||||
|
||||
use super::{
|
||||
Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList,
|
||||
SlidingSyncListBuilder, SlidingSyncRoom,
|
||||
Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncInner,
|
||||
SlidingSyncList, SlidingSyncListBuilder, SlidingSyncRoom,
|
||||
};
|
||||
use crate::{Client, Result};
|
||||
|
||||
@@ -286,10 +286,10 @@ impl SlidingSyncBuilder {
|
||||
|
||||
trace!(len = rooms_found.len(), "rooms unfrozen");
|
||||
|
||||
let rooms = Arc::new(StdRwLock::new(rooms_found));
|
||||
let lists = Arc::new(StdRwLock::new(self.lists));
|
||||
let rooms = StdRwLock::new(rooms_found);
|
||||
let lists = StdRwLock::new(self.lists);
|
||||
|
||||
Ok(SlidingSync {
|
||||
Ok(SlidingSync::new(SlidingSyncInner {
|
||||
homeserver: self.homeserver,
|
||||
client,
|
||||
storage_key: self.storage_key,
|
||||
@@ -297,13 +297,13 @@ impl SlidingSyncBuilder {
|
||||
lists,
|
||||
rooms,
|
||||
|
||||
extensions: Mutex::new(self.extensions).into(),
|
||||
extensions: Mutex::new(self.extensions),
|
||||
reset_counter: Default::default(),
|
||||
|
||||
pos: Arc::new(StdRwLock::new(Observable::new(None))),
|
||||
delta_token: Arc::new(StdRwLock::new(Observable::new(delta_token_inner))),
|
||||
subscriptions: Arc::new(StdRwLock::new(self.subscriptions)),
|
||||
pos: StdRwLock::new(Observable::new(None)),
|
||||
delta_token: StdRwLock::new(Observable::new(delta_token_inner)),
|
||||
subscriptions: StdRwLock::new(self.subscriptions),
|
||||
unsubscribe: Default::default(),
|
||||
})
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -597,6 +597,7 @@ mod list;
|
||||
mod room;
|
||||
|
||||
use std::{
|
||||
borrow::BorrowMut,
|
||||
collections::BTreeMap,
|
||||
fmt::Debug,
|
||||
mem,
|
||||
@@ -613,6 +614,8 @@ pub use error::*;
|
||||
use eyeball::Observable;
|
||||
use futures_core::stream::Stream;
|
||||
pub use list::*;
|
||||
use matrix_sdk_base::sync::SyncResponse;
|
||||
use matrix_sdk_common::locks::Mutex as AsyncMutex;
|
||||
pub use room::*;
|
||||
use ruma::{
|
||||
api::client::{
|
||||
@@ -624,6 +627,7 @@ use ruma::{
|
||||
assign, OwnedRoomId, RoomId,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::spawn;
|
||||
use tracing::{debug, error, info_span, instrument, trace, warn, Instrument, Span};
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
@@ -640,8 +644,19 @@ use crate::{config::RequestConfig, Client, Result};
|
||||
const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3;
|
||||
|
||||
/// The Sliding Sync instance.
|
||||
///
|
||||
/// It is OK to clone this type as much as you need: cloning it is cheap.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SlidingSync {
|
||||
/// The Sliding Sync data.
|
||||
inner: Arc<SlidingSyncInner>,
|
||||
|
||||
/// A lock to ensure that responses are handled one at a time.
|
||||
response_handling_lock: Arc<AsyncMutex<()>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct SlidingSyncInner {
|
||||
/// Customize the homeserver for sliding sync only
|
||||
homeserver: Option<Url>,
|
||||
|
||||
@@ -652,55 +667,37 @@ pub struct SlidingSync {
|
||||
storage_key: Option<String>,
|
||||
|
||||
/// The `pos` marker.
|
||||
pos: Arc<StdRwLock<Observable<Option<String>>>>,
|
||||
pos: StdRwLock<Observable<Option<String>>>,
|
||||
|
||||
delta_token: Arc<StdRwLock<Observable<Option<String>>>>,
|
||||
delta_token: StdRwLock<Observable<Option<String>>>,
|
||||
|
||||
/// The lists of this Sliding Sync instance.
|
||||
lists: Arc<StdRwLock<BTreeMap<String, SlidingSyncList>>>,
|
||||
lists: StdRwLock<BTreeMap<String, SlidingSyncList>>,
|
||||
|
||||
/// The rooms details
|
||||
rooms: Arc<StdRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>>,
|
||||
rooms: StdRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
|
||||
|
||||
subscriptions: Arc<StdRwLock<BTreeMap<OwnedRoomId, v4::RoomSubscription>>>,
|
||||
unsubscribe: Arc<StdRwLock<Vec<OwnedRoomId>>>,
|
||||
subscriptions: StdRwLock<BTreeMap<OwnedRoomId, v4::RoomSubscription>>,
|
||||
unsubscribe: StdRwLock<Vec<OwnedRoomId>>,
|
||||
|
||||
/// Number of times a Sliding Session session has been reset.
|
||||
reset_counter: Arc<AtomicU8>,
|
||||
reset_counter: AtomicU8,
|
||||
|
||||
/// the intended state of the extensions being supplied to sliding /sync
|
||||
/// calls. May contain the latest next_batch for to_devices, etc.
|
||||
extensions: Arc<Mutex<Option<ExtensionsConfig>>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct FrozenSlidingSync {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
to_device_since: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
delta_token: Option<String>,
|
||||
}
|
||||
|
||||
impl From<&SlidingSync> for FrozenSlidingSync {
|
||||
fn from(v: &SlidingSync) -> Self {
|
||||
FrozenSlidingSync {
|
||||
delta_token: v.delta_token.read().unwrap().clone(),
|
||||
to_device_since: v
|
||||
.extensions
|
||||
.lock()
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
.and_then(|ext| ext.to_device.as_ref()?.since.clone()),
|
||||
}
|
||||
}
|
||||
extensions: Mutex<Option<ExtensionsConfig>>,
|
||||
}
|
||||
|
||||
impl SlidingSync {
|
||||
pub(super) fn new(inner: SlidingSyncInner) -> Self {
|
||||
Self { inner: Arc::new(inner), response_handling_lock: Arc::new(AsyncMutex::new(())) }
|
||||
}
|
||||
|
||||
async fn cache_to_storage(&self) -> Result<(), crate::Error> {
|
||||
let Some(storage_key) = self.storage_key.as_ref() else { return Ok(()) };
|
||||
let Some(storage_key) = self.inner.storage_key.as_ref() else { return Ok(()) };
|
||||
trace!(storage_key, "Saving to storage for later use");
|
||||
|
||||
let store = self.client.store();
|
||||
let store = self.inner.client.store();
|
||||
|
||||
// Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside
|
||||
// the client store.
|
||||
@@ -713,9 +710,10 @@ impl SlidingSync {
|
||||
|
||||
// Write every `SlidingSyncList` inside the client the store.
|
||||
let frozen_lists = {
|
||||
let rooms_lock = self.rooms.read().unwrap();
|
||||
let rooms_lock = self.inner.rooms.read().unwrap();
|
||||
|
||||
self.lists
|
||||
self.inner
|
||||
.lists
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
@@ -746,16 +744,16 @@ impl SlidingSync {
|
||||
/// lists but without the current state.
|
||||
pub fn new_builder_copy(&self) -> SlidingSyncBuilder {
|
||||
let mut builder = Self::builder()
|
||||
.client(self.client.clone())
|
||||
.subscriptions(self.subscriptions.read().unwrap().to_owned());
|
||||
.client(self.inner.client.clone())
|
||||
.subscriptions(self.inner.subscriptions.read().unwrap().to_owned());
|
||||
|
||||
for list in self.lists.read().unwrap().values().map(|list| {
|
||||
for list in self.inner.lists.read().unwrap().values().map(|list| {
|
||||
list.new_builder().build().expect("builder worked before, builder works now")
|
||||
}) {
|
||||
builder = builder.add_list(list);
|
||||
}
|
||||
|
||||
if let Some(homeserver) = &self.homeserver {
|
||||
if let Some(homeserver) = &self.inner.homeserver {
|
||||
builder.homeserver(homeserver.clone())
|
||||
} else {
|
||||
builder
|
||||
@@ -768,7 +766,7 @@ impl SlidingSync {
|
||||
/// poll the stream after you've altered this. If you do that during, it
|
||||
/// might take one round trip to take effect.
|
||||
pub fn subscribe(&self, room_id: OwnedRoomId, settings: Option<v4::RoomSubscription>) {
|
||||
self.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default());
|
||||
self.inner.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default());
|
||||
}
|
||||
|
||||
/// Unsubscribe from a given room.
|
||||
@@ -777,14 +775,14 @@ impl SlidingSync {
|
||||
/// poll the stream after you've altered this. If you do that during, it
|
||||
/// might take one round trip to take effect.
|
||||
pub fn unsubscribe(&self, room_id: OwnedRoomId) {
|
||||
if self.subscriptions.write().unwrap().remove(&room_id).is_some() {
|
||||
self.unsubscribe.write().unwrap().push(room_id);
|
||||
if self.inner.subscriptions.write().unwrap().remove(&room_id).is_some() {
|
||||
self.inner.unsubscribe.write().unwrap().push(room_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Add the common extensions if not already configured.
|
||||
pub fn add_common_extensions(&self) {
|
||||
let mut lock = self.extensions.lock().unwrap();
|
||||
let mut lock = self.inner.extensions.lock().unwrap();
|
||||
let mut cfg = lock.get_or_insert_with(Default::default);
|
||||
|
||||
if cfg.to_device.is_none() {
|
||||
@@ -802,18 +800,19 @@ impl SlidingSync {
|
||||
|
||||
/// Lookup a specific room
|
||||
pub fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
|
||||
self.rooms.read().unwrap().get(room_id).cloned()
|
||||
self.inner.rooms.read().unwrap().get(room_id).cloned()
|
||||
}
|
||||
|
||||
/// Check the number of rooms.
|
||||
pub fn get_number_of_rooms(&self) -> usize {
|
||||
self.rooms.read().unwrap().len()
|
||||
self.inner.rooms.read().unwrap().len()
|
||||
}
|
||||
|
||||
fn update_to_device_since(&self, since: String) {
|
||||
// FIXME: Find a better place where the to-device since token should be
|
||||
// persisted.
|
||||
self.extensions
|
||||
self.inner
|
||||
.extensions
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_or_insert_with(Default::default)
|
||||
@@ -828,7 +827,7 @@ impl SlidingSync {
|
||||
/// listening to the stream and is therefor not necessarily up to date
|
||||
/// with the lists used for the stream.
|
||||
pub fn list(&self, list_name: &str) -> Option<SlidingSyncList> {
|
||||
self.lists.read().unwrap().get(list_name).cloned()
|
||||
self.inner.lists.read().unwrap().get(list_name).cloned()
|
||||
}
|
||||
|
||||
/// Remove the SlidingSyncList named `list_name` from the lists list if
|
||||
@@ -838,7 +837,7 @@ impl SlidingSync {
|
||||
/// stream created after this. The old stream will still continue to use the
|
||||
/// previous set of lists.
|
||||
pub fn pop_list(&self, list_name: &String) -> Option<SlidingSyncList> {
|
||||
self.lists.write().unwrap().remove(list_name)
|
||||
self.inner.lists.write().unwrap().remove(list_name)
|
||||
}
|
||||
|
||||
/// Add the list to the list of lists.
|
||||
@@ -851,7 +850,7 @@ impl SlidingSync {
|
||||
/// stream created after this. The old stream will still continue to use the
|
||||
/// previous set of lists.
|
||||
pub fn add_list(&self, list: SlidingSyncList) -> Option<SlidingSyncList> {
|
||||
self.lists.write().unwrap().insert(list.name.clone(), list)
|
||||
self.inner.lists.write().unwrap().insert(list.name.clone(), list)
|
||||
}
|
||||
|
||||
/// Lookup a set of rooms
|
||||
@@ -859,14 +858,14 @@ impl SlidingSync {
|
||||
&self,
|
||||
room_ids: I,
|
||||
) -> Vec<Option<SlidingSyncRoom>> {
|
||||
let rooms = self.rooms.read().unwrap();
|
||||
let rooms = self.inner.rooms.read().unwrap();
|
||||
|
||||
room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
|
||||
}
|
||||
|
||||
/// Get all rooms.
|
||||
pub fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
|
||||
self.rooms.read().unwrap().values().cloned().collect()
|
||||
self.inner.rooms.read().unwrap().values().cloned().collect()
|
||||
}
|
||||
|
||||
fn prepare_extension_config(&self, pos: Option<&str>) -> ExtensionsConfig {
|
||||
@@ -874,7 +873,7 @@ impl SlidingSync {
|
||||
// The pos is `None`, it's either our initial sync or the proxy forgot about us
|
||||
// and sent us an `UnknownPos` error. We need to send out the config for our
|
||||
// extensions.
|
||||
let mut extensions = self.extensions.lock().unwrap().clone().unwrap_or_default();
|
||||
let mut extensions = self.inner.extensions.lock().unwrap().clone().unwrap_or_default();
|
||||
|
||||
// Always enable to-device events and the e2ee-extension on the initial request,
|
||||
// no matter what the caller wants.
|
||||
@@ -897,6 +896,7 @@ impl SlidingSync {
|
||||
// We already enabled all the things, just fetch out the to-device since token
|
||||
// out of self.extensions and set it in a new, and empty, `ExtensionsConfig`.
|
||||
let since = self
|
||||
.inner
|
||||
.extensions
|
||||
.lock()
|
||||
.unwrap()
|
||||
@@ -912,44 +912,21 @@ impl SlidingSync {
|
||||
|
||||
/// Handle the HTTP response.
|
||||
#[instrument(skip_all, fields(lists = list_generators.len()))]
|
||||
async fn handle_response(
|
||||
fn handle_response(
|
||||
&self,
|
||||
sliding_sync_response: v4::Response,
|
||||
stream_id: &str,
|
||||
mut sync_response: SyncResponse,
|
||||
list_generators: &mut BTreeMap<String, SlidingSyncListRequestGenerator>,
|
||||
) -> Result<UpdateSummary, crate::Error> {
|
||||
match &sliding_sync_response.txn_id {
|
||||
None => {
|
||||
error!(stream_id, "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; it's missing");
|
||||
}
|
||||
|
||||
Some(txn_id) if txn_id != stream_id => {
|
||||
error!(
|
||||
stream_id,
|
||||
txn_id,
|
||||
"Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; they differ"
|
||||
);
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Handle and transform a Sliding Sync Response to a `SyncResponse`.
|
||||
//
|
||||
// We may not need the `sync_response` in the future (once `SyncResponse` will
|
||||
// move to Sliding Sync, i.e. to `v4::Response`), but processing the
|
||||
// `sliding_sync_response` is vital, so it must be done somewhere; for now it
|
||||
// happens here.
|
||||
let mut sync_response = self.client.process_sliding_sync(&sliding_sync_response).await?;
|
||||
|
||||
debug!("Sliding sync response has been processed");
|
||||
|
||||
Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos));
|
||||
Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token);
|
||||
Observable::set(&mut self.inner.pos.write().unwrap(), Some(sliding_sync_response.pos));
|
||||
Observable::set(
|
||||
&mut self.inner.delta_token.write().unwrap(),
|
||||
sliding_sync_response.delta_token,
|
||||
);
|
||||
|
||||
let update_summary = {
|
||||
let mut rooms = Vec::new();
|
||||
let mut rooms_map = self.rooms.write().unwrap();
|
||||
let mut rooms_map = self.inner.rooms.write().unwrap();
|
||||
|
||||
for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
|
||||
// `sync_response` contains the rooms with decrypted events if any, so look at
|
||||
@@ -973,7 +950,7 @@ impl SlidingSync {
|
||||
rooms_map.insert(
|
||||
room_id.clone(),
|
||||
SlidingSyncRoom::new(
|
||||
self.client.clone(),
|
||||
self.inner.client.clone(),
|
||||
room_id.clone(),
|
||||
room_data,
|
||||
timeline,
|
||||
@@ -1009,19 +986,19 @@ impl SlidingSync {
|
||||
UpdateSummary { lists: updated_lists, rooms }
|
||||
};
|
||||
|
||||
self.cache_to_storage().await?;
|
||||
|
||||
Ok(update_summary)
|
||||
}
|
||||
|
||||
async fn sync_once(
|
||||
&self,
|
||||
stream_id: &str,
|
||||
list_generators: &mut BTreeMap<String, SlidingSyncListRequestGenerator>,
|
||||
list_generators: Arc<Mutex<BTreeMap<String, SlidingSyncListRequestGenerator>>>,
|
||||
) -> Result<Option<UpdateSummary>> {
|
||||
let mut lists = BTreeMap::new();
|
||||
|
||||
{
|
||||
let mut list_generators_lock = list_generators.lock().unwrap();
|
||||
let list_generators = list_generators_lock.borrow_mut();
|
||||
let mut lists_to_remove = Vec::new();
|
||||
|
||||
for (name, generator) in list_generators.iter_mut() {
|
||||
@@ -1035,16 +1012,16 @@ impl SlidingSync {
|
||||
for list_name in lists_to_remove {
|
||||
list_generators.remove(&list_name);
|
||||
}
|
||||
|
||||
if list_generators.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
if list_generators.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let pos = self.pos.read().unwrap().clone();
|
||||
let delta_token = self.delta_token.read().unwrap().clone();
|
||||
let room_subscriptions = self.subscriptions.read().unwrap().clone();
|
||||
let unsubscribe_rooms = mem::take(&mut *self.unsubscribe.write().unwrap());
|
||||
let pos = self.inner.pos.read().unwrap().clone();
|
||||
let delta_token = self.inner.delta_token.read().unwrap().clone();
|
||||
let room_subscriptions = self.inner.subscriptions.read().unwrap().clone();
|
||||
let unsubscribe_rooms = mem::take(&mut *self.inner.unsubscribe.write().unwrap());
|
||||
let timeout = Duration::from_secs(30);
|
||||
let extensions = self.prepare_extension_config(pos.as_deref());
|
||||
|
||||
@@ -1055,7 +1032,7 @@ impl SlidingSync {
|
||||
let request_config = RequestConfig::default().timeout(timeout + Duration::from_secs(30));
|
||||
|
||||
// Prepare the request.
|
||||
let request = self.client.send_with_homeserver(
|
||||
let request = self.inner.client.send_with_homeserver(
|
||||
assign!(v4::Request::new(), {
|
||||
pos,
|
||||
delta_token,
|
||||
@@ -1069,7 +1046,7 @@ impl SlidingSync {
|
||||
extensions,
|
||||
}),
|
||||
Some(request_config),
|
||||
self.homeserver.as_ref().map(ToString::to_string),
|
||||
self.inner.homeserver.as_ref().map(ToString::to_string),
|
||||
);
|
||||
|
||||
// Send the request and get a response with end-to-end encryption support.
|
||||
@@ -1084,7 +1061,8 @@ impl SlidingSync {
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
let response = {
|
||||
let (e2ee_uploads, response) =
|
||||
futures_util::future::join(self.client.send_outgoing_requests(), request).await;
|
||||
futures_util::future::join(self.inner.client.send_outgoing_requests(), request)
|
||||
.await;
|
||||
|
||||
if let Err(error) = e2ee_uploads {
|
||||
error!(?error, "Error while sending outgoing E2EE requests");
|
||||
@@ -1097,25 +1075,76 @@ impl SlidingSync {
|
||||
#[cfg(not(feature = "e2e-encryption"))]
|
||||
let response = request.await?;
|
||||
|
||||
debug!("Sliding sync response received");
|
||||
// At this point, the request has been sent, and a response has been received.
|
||||
//
|
||||
// We must ensure the handling of the response cannot be stopped/
|
||||
// cancelled. It must be done entirely, otherwise we can have
|
||||
// corrupted/incomplete states for Sliding Sync and other parts of
|
||||
// the code.
|
||||
//
|
||||
// That's why we are running the handling of the response in a spawned
|
||||
// future that cannot be cancelled by anything.
|
||||
let this = self.clone();
|
||||
let stream_id = stream_id.to_owned();
|
||||
|
||||
let updates = self.handle_response(response, stream_id, list_generators).await?;
|
||||
// Spawn a new future to ensure that the code inside this future cannot be
|
||||
// cancelled if this method is cancelled.
|
||||
spawn(async move {
|
||||
debug!("Sliding sync response received");
|
||||
|
||||
debug!("Sliding sync response has been handled");
|
||||
// In case the task running this future is detached, we must be
|
||||
// ensure responses are handled one at a time, hence we lock the
|
||||
// `response_handling_lock`.
|
||||
let global_lock = this.response_handling_lock.lock().await;
|
||||
|
||||
Ok(Some(updates))
|
||||
match &response.txn_id {
|
||||
None => {
|
||||
error!(stream_id, "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; it's missing");
|
||||
}
|
||||
|
||||
Some(txn_id) if txn_id != &stream_id => {
|
||||
error!(
|
||||
stream_id,
|
||||
txn_id,
|
||||
"Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; they differ"
|
||||
);
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Handle and transform a Sliding Sync Response to a `SyncResponse`.
|
||||
//
|
||||
// We may not need the `sync_response` in the future (once `SyncResponse` will
|
||||
// move to Sliding Sync, i.e. to `v4::Response`), but processing the
|
||||
// `sliding_sync_response` is vital, so it must be done somewhere; for now it
|
||||
// happens here.
|
||||
let sync_response = this.inner.client.process_sliding_sync(&response).await?;
|
||||
|
||||
debug!("Sliding sync response has been processed");
|
||||
|
||||
let updates = this.handle_response(response, sync_response, list_generators.lock().unwrap().borrow_mut())?;
|
||||
|
||||
this.cache_to_storage().await?;
|
||||
|
||||
drop(global_lock);
|
||||
|
||||
debug!("Sliding sync response has been handled");
|
||||
|
||||
Ok(Some(updates))
|
||||
}).await.unwrap()
|
||||
}
|
||||
|
||||
/// Create a _new_ Sliding Sync stream.
|
||||
///
|
||||
/// This stream will send requests and will handle responses automatically,
|
||||
/// hence updating the lists.
|
||||
#[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)]
|
||||
#[instrument(name = "sync_stream", skip_all, parent = &self.inner.client.root_span)]
|
||||
pub fn stream(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
|
||||
// Collect all the lists that need to be updated.
|
||||
let mut list_generators = {
|
||||
let list_generators = {
|
||||
let mut list_generators = BTreeMap::new();
|
||||
let lock = self.lists.read().unwrap();
|
||||
let lock = self.inner.lists.read().unwrap();
|
||||
|
||||
for (name, lists) in lock.iter() {
|
||||
list_generators.insert(name.clone(), lists.request_generator());
|
||||
@@ -1126,21 +1155,22 @@ impl SlidingSync {
|
||||
|
||||
let stream_id = Uuid::new_v4().to_string();
|
||||
|
||||
debug!(?self.extensions, stream_id, "About to run the sync stream");
|
||||
debug!(?self.inner.extensions, stream_id, "About to run the sync stream");
|
||||
|
||||
let instrument_span = Span::current();
|
||||
let list_generators = Arc::new(Mutex::new(list_generators));
|
||||
|
||||
async_stream::stream! {
|
||||
loop {
|
||||
let sync_span = info_span!(parent: &instrument_span, "sync_once");
|
||||
|
||||
sync_span.in_scope(|| {
|
||||
debug!(?self.extensions, "Sync stream loop is running");
|
||||
debug!(?self.inner.extensions, "Sync stream loop is running");
|
||||
});
|
||||
|
||||
match self.sync_once(&stream_id, &mut list_generators).instrument(sync_span.clone()).await {
|
||||
match self.sync_once(&stream_id, list_generators.clone()).instrument(sync_span.clone()).await {
|
||||
Ok(Some(updates)) => {
|
||||
self.reset_counter.store(0, Ordering::SeqCst);
|
||||
self.inner.reset_counter.store(0, Ordering::SeqCst);
|
||||
|
||||
yield Ok(updates);
|
||||
}
|
||||
@@ -1154,7 +1184,7 @@ impl SlidingSync {
|
||||
// The session has expired.
|
||||
|
||||
// Has it expired too many times?
|
||||
if self.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION {
|
||||
if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION {
|
||||
sync_span.in_scope(|| error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row"));
|
||||
|
||||
// The session has expired too many times, let's raise an error!
|
||||
@@ -1168,9 +1198,9 @@ impl SlidingSync {
|
||||
warn!("Session expired. Restarting Sliding Sync.");
|
||||
|
||||
// To “restart” a Sliding Sync session, we set `pos` to its initial value.
|
||||
Observable::set(&mut self.pos.write().unwrap(), None);
|
||||
Observable::set(&mut self.inner.pos.write().unwrap(), None);
|
||||
|
||||
debug!(?self.extensions, "Sliding Sync has been reset");
|
||||
debug!(?self.inner.extensions, "Sliding Sync has been reset");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1188,12 +1218,35 @@ impl SlidingSync {
|
||||
impl SlidingSync {
|
||||
/// Get a copy of the `pos` value.
|
||||
pub fn pos(&self) -> Option<String> {
|
||||
self.pos.read().unwrap().clone()
|
||||
self.inner.pos.read().unwrap().clone()
|
||||
}
|
||||
|
||||
/// Set a new value for `pos`.
|
||||
pub fn set_pos(&self, new_pos: String) {
|
||||
Observable::set(&mut self.pos.write().unwrap(), Some(new_pos));
|
||||
Observable::set(&mut self.inner.pos.write().unwrap(), Some(new_pos));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct FrozenSlidingSync {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
to_device_since: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
delta_token: Option<String>,
|
||||
}
|
||||
|
||||
impl From<&SlidingSync> for FrozenSlidingSync {
|
||||
fn from(sliding_sync: &SlidingSync) -> Self {
|
||||
FrozenSlidingSync {
|
||||
delta_token: sliding_sync.inner.delta_token.read().unwrap().clone(),
|
||||
to_device_since: sliding_sync
|
||||
.inner
|
||||
.extensions
|
||||
.lock()
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
.and_then(|ext| ext.to_device.as_ref()?.since.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user