feat(sdk): Remove txn_id from requests sent by SlidingSync.

We were using `txn_id` as a way to identify the running stream. Now
things are cleaner so we can remove this “debug tool”. This class of
problems should not appear anymore. For the record, the biggest problem
was the following: It was possible to start multiple `stream`s at the
same time, and thus a stream could receive a response sent by another
stream. Since we no longer need to restart SlidingSync anymore (i.e. no
need to start multiple `stream`s at the same time), this problem should
not happen.
This commit is contained in:
Ivan Enderlin
2023-05-15 15:16:40 +02:00
parent 94d5d5187f
commit caaeb8130d

View File

@@ -60,7 +60,6 @@ use tokio::{
};
use tracing::{debug, error, instrument, warn, Instrument, Span};
use url::Url;
use uuid::Uuid;
use crate::{config::RequestConfig, Client, Result};
@@ -396,7 +395,7 @@ impl SlidingSync {
}
#[instrument(skip_all, fields(pos))]
async fn sync_once(&self, stream_id: &str) -> Result<Option<UpdateSummary>> {
async fn sync_once(&self) -> Result<Option<UpdateSummary>> {
let (request, request_config) = {
// Collect requests for lists.
let mut requests_lists = BTreeMap::new();
@@ -433,9 +432,6 @@ impl SlidingSync {
assign!(v4::Request::new(), {
pos,
delta_token,
// We want to track whether the incoming response maps to this
// request. We use the (optional) `txn_id` field for that.
txn_id: Some(stream_id.to_owned()),
timeout: Some(timeout),
lists: requests_lists,
bump_event_types: self.inner.bump_event_types.clone(),
@@ -498,7 +494,6 @@ impl SlidingSync {
// That's why we are running the handling of the response in a spawned
// future that cannot be cancelled by anything.
let this = self.clone();
let stream_id = stream_id.to_owned();
// Spawn a new future to ensure that the code inside this future cannot be
// cancelled if this method is cancelled.
@@ -510,22 +505,6 @@ impl SlidingSync {
// `response_handling_lock`.
let response_handling_lock = this.response_handling_lock.lock().await;
match &response.txn_id {
None => {
error!(stream_id, "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; it's missing");
}
Some(txn_id) if txn_id != &stream_id => {
error!(
stream_id,
txn_id,
"Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; they differ"
);
}
_ => {}
}
// Handle the response.
let updates = this.handle_response(response).await?;
@@ -549,10 +528,7 @@ impl SlidingSync {
#[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
#[instrument(name = "sync_stream", skip_all)]
pub fn stream(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
// Define a stream ID.
let stream_id = Uuid::new_v4().to_string();
debug!(?self.inner.extensions, stream_id, "About to run the sync stream");
debug!(?self.inner.extensions, "About to run the sync stream");
let sync_stream_span = Span::current();
@@ -581,7 +557,7 @@ impl SlidingSync {
}
}
update_summary = self.sync_once(&stream_id).instrument(sync_stream_span.clone()) => {
update_summary = self.sync_once().instrument(sync_stream_span.clone()) => {
match update_summary {
Ok(Some(updates)) => {
self.inner.reset_counter.store(0, Ordering::SeqCst);