From ec00af0bca5989741fd31b91e6e4ff610ed6cf99 Mon Sep 17 00:00:00 2001 From: Johannes Becker Date: Wed, 13 Jul 2022 10:11:43 +0200 Subject: [PATCH] refactor(appservice)!: Improve API and cleanup docs --- .../examples/appservice_autojoin.rs | 8 +- .../src/event_handler.rs | 14 + crates/matrix-sdk-appservice/src/lib.rs | 494 ++++-------------- .../matrix-sdk-appservice/src/registration.rs | 124 +++++ .../matrix-sdk-appservice/src/virtual_user.rs | 149 ++++++ crates/matrix-sdk-appservice/tests/tests.rs | 15 +- 6 files changed, 396 insertions(+), 408 deletions(-) create mode 100644 crates/matrix-sdk-appservice/src/registration.rs create mode 100644 crates/matrix-sdk-appservice/src/virtual_user.rs diff --git a/crates/matrix-sdk-appservice/examples/appservice_autojoin.rs b/crates/matrix-sdk-appservice/examples/appservice_autojoin.rs index 0964ef528..e1c295af6 100644 --- a/crates/matrix-sdk-appservice/examples/appservice_autojoin.rs +++ b/crates/matrix-sdk-appservice/examples/appservice_autojoin.rs @@ -31,7 +31,7 @@ pub async fn handle_room_member( error_if_user_not_in_use(error)?; } - let client = appservice.virtual_user_client(user_id.localpart()).await?; + let client = appservice.virtual_user(Some(user_id.localpart())).await?; client.join_room_by_id(room.room_id()).await?; } @@ -61,7 +61,9 @@ pub async fn main() -> Result<(), Box> { let appservice = AppService::new(homeserver_url, server_name, registration).await?; appservice.register_user_query(Box::new(|_, _| Box::pin(async { true }))).await; appservice - .register_event_handler_context(appservice.clone())? + .virtual_user(None) + .await? + .register_event_handler_context(appservice.clone()) .register_event_handler( move |event: OriginalSyncRoomMemberEvent, room: Room, @@ -69,7 +71,7 @@ pub async fn main() -> Result<(), Box> { handle_room_member(appservice, room, event) }, ) - .await?; + .await; let (host, port) = appservice.registration().get_host_and_port()?; appservice.run(host, port).await?; diff --git a/crates/matrix-sdk-appservice/src/event_handler.rs b/crates/matrix-sdk-appservice/src/event_handler.rs index 0e1d1a73c..7d86a5331 100644 --- a/crates/matrix-sdk-appservice/src/event_handler.rs +++ b/crates/matrix-sdk-appservice/src/event_handler.rs @@ -1,3 +1,17 @@ +// Copyright 2022 Famedly GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::{future::Future, pin::Pin, sync::Arc}; use matrix_sdk::locks::Mutex; diff --git a/crates/matrix-sdk-appservice/src/lib.rs b/crates/matrix-sdk-appservice/src/lib.rs index 5060f2b8e..f84e9ce70 100644 --- a/crates/matrix-sdk-appservice/src/lib.rs +++ b/crates/matrix-sdk-appservice/src/lib.rs @@ -17,12 +17,13 @@ //! The appservice crate aims to provide a batteries-included experience by //! being a thin wrapper around the [`matrix_sdk`]. That means that we //! -//! * ship with functionality to configure your webserver crate or simply run -//! the webserver for you -//! * receive and validate requests from the homeserver correctly -//! * allow calling the homeserver with proper virtual user identity assertion -//! * have consistent room state by leveraging matrix-sdk's state store -//! * provide E2EE support by leveraging matrix-sdk's crypto store +//! - [x] ship with functionality to configure your webserver crate or simply +//! run the webserver for you +//! - [x] receive and validate requests from the homeserver correctly +//! - [x] allow calling the homeserver with proper virtual user identity +//! assertion +//! - [x] have consistent room state by leveraging matrix-sdk's state store +//! - [ ] provide E2EE support by leveraging matrix-sdk's crypto store //! //! # Status //! @@ -31,10 +32,10 @@ //! //! # Registration //! -//! The crate relies on the registration being always in sync with the actual -//! registration used by the homeserver. That's because it's required for the -//! access tokens and because membership states for virtual users are determined -//! based on the registered namespace. +//! The crate relies on the appservice registration being always in sync with +//! the actual registration used by the homeserver. That's because it's required +//! for the access tokens and because membership states for virtual users are +//! determined based on the registered namespaces. //! //! **Note:** Non-exclusive registration namespaces are not yet supported and //! hence might lead to undefined behavior. @@ -65,9 +66,13 @@ //! ")?; //! //! let mut appservice = AppService::new(homeserver_url, server_name, registration).await?; -//! appservice.register_event_handler(|_ev: SyncRoomMemberEvent| async { -//! // do stuff -//! }); +//! appservice +//! .virtual_user(None) +//! .await? +//! .register_event_handler(|_ev: SyncRoomMemberEvent| async { +//! // do stuff +//! }) +//! .await; //! //! let (host, port) = appservice.registration().get_host_and_port()?; //! appservice.run(host, port).await?; @@ -80,285 +85,50 @@ //! //! [Application Service]: https://matrix.org/docs/spec/application_service/r0.1.2 //! [matrix-org/matrix-rust-sdk#228]: https://github.com/matrix-org/matrix-rust-sdk/issues/228 -//! [examples directory]: https://github.com/matrix-org/matrix-rust-sdk/tree/main/crates/matrix-sdk-appservice/examples +//! [examples directory]: https://github.com/matrix-org/matrix-rust-sdk/tree/master/matrix_sdk_appservice/examples -use std::{ - convert::{TryFrom, TryInto}, - fs::File, - future::Future, - ops::Deref, - path::PathBuf, - sync::Arc, -}; +use std::{convert::TryInto, sync::Arc}; use dashmap::DashMap; pub use error::Error; use event_handler::AppserviceFn; -use http::Uri; pub use matrix_sdk; #[doc(no_inline)] pub use matrix_sdk::ruma; -use matrix_sdk::{ - bytes::Bytes, - config::RequestConfig, - event_handler::{EventHandler, EventHandlerResult, SyncEvent}, - reqwest::Url, - Client, ClientBuildError, ClientBuilder, Session, -}; -use regex::Regex; +use matrix_sdk::{bytes::Bytes, reqwest::Url, Client, ClientBuilder}; use ruma::{ api::{ appservice::{ event::push_events, query::{query_room_alias::v1 as query_room, query_user_id::v1 as query_user}, - Registration, }, - client::{account::register, session::login, sync::sync_events, uiaa::UserIdentifier}, + client::{account::register, sync::sync_events}, }, assign, events::{room::member::MembershipState, AnyRoomEvent, AnyStateEvent}, - DeviceId, IdParseError, OwnedDeviceId, OwnedRoomId, OwnedServerName, UserId, + IdParseError, OwnedRoomId, OwnedServerName, }; -use serde::{de::DeserializeOwned, Deserialize}; +use serde::Deserialize; use tokio::task::JoinHandle; use tracing::{debug, info, warn}; mod error; pub mod event_handler; +pub mod registration; +pub mod virtual_user; mod webserver; +pub use registration::AppServiceRegistration; +use registration::NamespaceCache; +pub use virtual_user::VirtualUserBuilder; + pub type Result = std::result::Result; -pub type Host = String; -pub type Port = u16; const USER_KEY: &[u8] = b"appservice.users."; pub const USER_MEMBER: &[u8] = b"appservice.users.membership."; -/// Builder for a virtual user -#[derive(Debug)] -pub struct VirtualUserBuilder<'a> { - appservice: &'a AppService, - localpart: &'a str, - device_id: Option, - client_builder: ClientBuilder, - log_in: bool, - restored_session: Option, -} - -impl<'a> VirtualUserBuilder<'a> { - /// Create a new virtual user builder - /// # Arguments - /// - /// * `localpart` - The localpart of the virtual user - pub fn new(appservice: &'a AppService, localpart: &'a str) -> Self { - Self { - appservice, - localpart, - device_id: None, - client_builder: Client::builder(), - log_in: false, - restored_session: None, - } - } - - /// Set the device ID of the virtual user - pub fn device_id(mut self, device_id: Option) -> Self { - self.device_id = device_id; - self - } - - /// Sets the client builder to use for the virtual user - pub fn client_builder(mut self, client_builder: ClientBuilder) -> Self { - self.client_builder = client_builder; - self - } - - /// Log in as the virtual user - /// - /// In some cases it is necessary to log in as the virtual user, such as to - /// upload device keys - pub fn login(mut self) -> Self { - self.log_in = true; - self - } - - /// Restore a persisted session - /// - /// This is primarily useful if you enable - /// [`VirtualUserBuilder::login()`] and want to restore a session - /// from a previous run. - pub fn restored_session(mut self, session: Session) -> Self { - self.restored_session = Some(session); - self - } - - /// Build the virtual user - /// - /// # Errors - /// This function returns an error if an invalid localpart is provided. - pub async fn build(self) -> Result { - if let Some(client) = self.appservice.clients.get(self.localpart) { - return Ok(client.clone()); - } - - let user_id = UserId::parse_with_server_name(self.localpart, &self.appservice.server_name)?; - if !(self.appservice.user_id_is_in_namespace(&user_id) - || self.localpart == self.appservice.registration.sender_localpart) - { - warn!("Virtual client id '{user_id}' is not in the namespace") - } - - let mut builder = self.client_builder; - - if !self.log_in && self.localpart != self.appservice.registration.sender_localpart { - builder = builder.assert_identity(); - } - - let client = builder - .homeserver_url(self.appservice.homeserver_url.clone()) - .appservice_mode() - .build() - .await - .map_err(ClientBuildError::assert_valid_builder_args)?; - - let session = if let Some(session) = self.restored_session { - session - } else if self.log_in && self.localpart != self.appservice.registration.sender_localpart { - self.appservice - .create_session(self.localpart, self.device_id.as_ref().map(|v| v.as_ref()), None) - .await? - } else { - // Don’t log in - Session { - access_token: self.appservice.registration.as_token.clone(), - user_id: user_id.clone(), - device_id: self.device_id.unwrap_or_else(DeviceId::new), - } - }; - - client.restore_login(session).await?; - - self.appservice.clients.insert(self.localpart.to_owned(), client.clone()); - - Ok(client) - } -} - -/// AppService Registration -/// -/// Wrapper around [`Registration`] -#[derive(Debug, Clone)] -pub struct AppServiceRegistration { - inner: Registration, -} - -impl AppServiceRegistration { - /// Try to load registration from yaml string - /// - /// See the fields of [`Registration`] for the required format - pub fn try_from_yaml_str(value: impl AsRef) -> Result { - Ok(Self { inner: serde_yaml::from_str(value.as_ref())? }) - } - - /// Try to load registration from yaml file - /// - /// See the fields of [`Registration`] for the required format - pub fn try_from_yaml_file(path: impl Into) -> Result { - let file = File::open(path.into())?; - - Ok(Self { inner: serde_yaml::from_reader(file)? }) - } - - /// Get the host and port from the registration URL - /// - /// If no port is found it falls back to scheme defaults: 80 for http and - /// 443 for https - pub fn get_host_and_port(&self) -> Result<(Host, Port)> { - let uri = Uri::try_from(&self.inner.url)?; - - let host = uri.host().ok_or(Error::MissingRegistrationHost)?.to_owned(); - let port = match uri.port() { - Some(port) => Ok(port.as_u16()), - None => match uri.scheme_str() { - Some("http") => Ok(80), - Some("https") => Ok(443), - _ => Err(Error::MissingRegistrationPort), - }, - }?; - - Ok((host, port)) - } -} - -impl From for AppServiceRegistration { - fn from(value: Registration) -> Self { - Self { inner: value } - } -} - -impl Deref for AppServiceRegistration { - type Target = Registration; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -/// Cache data for the registration namespaces. -#[derive(Debug, Clone)] -pub struct NamespaceCache { - /// List of user regexes in our namespace - users: Vec, - /// List of alias regexes in our namespace - #[allow(dead_code)] - aliases: Vec, - /// List of room id regexes in our namespace - #[allow(dead_code)] - rooms: Vec, -} - -impl NamespaceCache { - /// Creates a new registration cache from a [`Registration`] value - pub fn from_registration(registration: &Registration) -> Result { - let users = registration - .namespaces - .users - .iter() - .map(|user| Regex::new(&user.regex)) - .collect::, _>>()?; - let aliases = registration - .namespaces - .aliases - .iter() - .map(|user| Regex::new(&user.regex)) - .collect::, _>>()?; - let rooms = registration - .namespaces - .rooms - .iter() - .map(|user| Regex::new(&user.regex)) - .collect::, _>>()?; - Ok(NamespaceCache { users, aliases, rooms }) - } -} - type Localpart = String; -/// The `localpart` of the user associated with the application service via -/// `sender_localpart` in [`AppServiceRegistration`]. -/// -/// Dummy type for shared documentation -#[allow(dead_code)] -pub type MainUser = (); - -/// The application service may specify the virtual user to act as through use -/// of a user_id query string parameter on the request. The user specified in -/// the query string must be covered by one of the [`AppServiceRegistration`]'s -/// `users` namespaces. -/// -/// Dummy type for shared documentation -pub type VirtualUser = (); - /// AppService #[derive(Debug, Clone)] pub struct AppService { @@ -371,11 +141,13 @@ pub struct AppService { } impl AppService { - /// Create new AppService + /// Create a new AppService. /// - /// Also creates and caches a [`Client`] for the [`MainUser`]. - /// A default [`ClientBuilder`] is used, if you want to customize it - /// use [`with_client_builder()`][Self::with_client_builder] instead. + /// This will also construct a [`virtual_user()`][Self::virtual_user] for + /// the `sender_localpart` of the given registration. This virtual user can + /// be used to register an event handler for all incoming events. Other + /// virtual users only receive events if they're known to be a member of a + /// room. /// /// # Arguments /// @@ -399,7 +171,7 @@ impl AppService { } /// Same as [`new()`][Self::new] but lets you provide a [`ClientBuilder`] - /// for the [`Client`] + /// for the virtual user that gets constructed for the `sender_localpart`. pub async fn with_client_builder( homeserver_url: impl TryInto, server_name: impl TryInto, @@ -423,143 +195,56 @@ impl AppService { event_handler, }; - // we create and cache the [`MainUser`] by default appservice.virtual_user_builder(&sender_localpart).client_builder(builder).build().await?; Ok(appservice) } - /// Create a [`Client`] for the given [`VirtualUser`]'s `localpart` + /// Create a virtual user client. /// - /// Will create and return a [`Client`] that's configured to [assert the - /// identity] on all outgoing homeserver requests if `localpart` is - /// given. + /// Will create and return a client that's configured to [assert the + /// identity] on outgoing homeserver requests that need authentication. /// /// This method is a singleton that saves the client internally for re-use - /// based on the `localpart`. The cached [`Client`] can be retrieved either - /// by calling this method again or by calling - /// [`get_cached_client()`][Self::get_cached_client] which is non-async - /// convenience wrapper. + /// based on the `localpart`. The cached client can be retrieved by calling + /// this method again. /// /// Note that if you want to do actions like joining rooms with a virtual - /// user it needs to be registered first. `Self::register_virtual_user()` - /// can be used for that purpose. + /// user it needs to be registered first. + /// [`register_virtual_user()`][Self::register_virtual_user] can be used + /// for that purpose. /// /// # Arguments /// - /// * `localpart` - The localpart of the user we want assert our identity to + /// * `localpart` - Used for constructing the accordingly. If `None` is + /// given it uses the `sender_localpart` from the registration. /// /// [registration]: https://matrix.org/docs/spec/application_service/r0.1.2#registration /// [assert the identity]: https://matrix.org/docs/spec/application_service/r0.1.2#identity-assertion - pub async fn virtual_user_client(&self, localpart: impl AsRef) -> Result { - self.virtual_user_builder(localpart.as_ref()).build().await + pub async fn virtual_user(&self, localpart: Option<&str>) -> Result { + let localpart = localpart.unwrap_or_else(|| self.registration.sender_localpart.as_ref()); + self.virtual_user_builder(localpart).build().await } - /// Same as [`virtual_user_client()`][Self::virtual_user_client] but with - /// the ability to pass in a [`ClientBuilder`] + /// Same as [`virtual_user()`][Self::virtual_user] but with + /// the ability to pass in a [`ClientBuilder`]. /// /// Since this method is a singleton follow-up calls with different /// [`ClientBuilder`]s will be ignored. - pub async fn virtual_user_client_with_client_builder( + pub async fn virtual_user_with_client_builder( &self, - localpart: impl AsRef, + localpart: Option<&str>, builder: ClientBuilder, ) -> Result { - self.virtual_user_builder(localpart.as_ref()).client_builder(builder).build().await + let localpart = localpart.unwrap_or_else(|| self.registration.sender_localpart.as_ref()); + self.virtual_user_builder(localpart).client_builder(builder).build().await } + /// Create a new virtual user builder for the given `localpart`. pub fn virtual_user_builder<'a>(&'a self, localpart: &'a str) -> VirtualUserBuilder<'a> { VirtualUserBuilder::new(self, localpart) } - /// Create a session using appservice login for a virtual user. - async fn create_session( - &self, - user: impl AsRef, - device_id: Option<&str>, - initial_device_display_name: Option<&str>, - ) -> Result { - let homeserver = self.homeserver_url.clone(); - info!(homeserver = homeserver.as_str(), user = user.as_ref(), "Logging in as virtual user"); - - let login_info = login::v3::LoginInfo::ApplicationService( - login::v3::ApplicationService::new(UserIdentifier::UserIdOrLocalpart(user.as_ref())), - ); - - let request = assign!(login::v3::Request::new(login_info), { - device_id: device_id.map(|d| d.into()), - initial_device_display_name - }); - - let response = self - .get_cached_client(None)? - .send(request, Some(RequestConfig::short_retry().force_auth())) - .await?; - - Ok(Session { - access_token: response.access_token, - user_id: response.user_id, - device_id: response.device_id, - }) - } - - /// Get cached [`Client`] - /// - /// Will return the client for the given `localpart` if previously - /// constructed with [`virtual_user_client()`][Self::virtual_user_client] or - /// [`virtual_user_client_with_config()`][Self:: - /// virtual_user_client_with_client_builder]. - /// - /// If no `localpart` is given it assumes the [`MainUser`]'s `localpart`. If - /// no client for `localpart` is found it will return an Error. - pub fn get_cached_client(&self, localpart: Option<&str>) -> Result { - let localpart = localpart.unwrap_or_else(|| self.registration.sender_localpart.as_ref()); - - let entry = self.clients.get(localpart).ok_or(Error::NoClientForLocalpart)?; - - Ok(entry.value().clone()) - } - - /// Convenience wrapper around [`Client::register_event_handler()`] that - /// attaches the event handler to the [`MainUser`]'s [`Client`] - /// - /// Note that the event handler in the [`AppService`] context only triggers - /// [`join` room `timeline` events], so no state events or events from the - /// `invite`, `knock` or `leave` scope. The rationale behind that is - /// that incoming AppService transactions from the homeserver are not - /// necessarily bound to a specific user but can cover a multitude of - /// namespaces, and as such the AppService basically only "observes - /// joined rooms". Also currently homeservers only push PDUs to appservices, - /// no EDUs. There's the open [MSC2409] regarding supporting EDUs in the - /// future, though it seems to be planned to put EDUs into a different - /// JSON key than `events` to stay backwards compatible. - /// - /// [`join` room `timeline` events]: https://spec.matrix.org/unstable/client-server-api/#get_matrixclientr0sync - /// [MSC2409]: https://github.com/matrix-org/matrix-doc/pull/2409 - pub async fn register_event_handler(&self, handler: H) -> Result<&Self> - where - Ev: SyncEvent + DeserializeOwned + Send + 'static, - H: EventHandler, - ::Output: EventHandlerResult, - { - let client = self.get_cached_client(None)?; - client.register_event_handler(handler).await; - - Ok(self) - } - - /// Convenience wrapper around [`Client::register_event_handler_context`] - /// attaches the event handler context to the [`MainUser`]'s [`Client`]. - pub fn register_event_handler_context(&self, ctx: T) -> Result<&Self> - where - T: Clone + Send + Sync + 'static, - { - let client = self.get_cached_client(None)?; - client.register_event_handler_context(ctx); - - Ok(self) - } - /// Register a responder for queries about the existence of a user with a /// given mxid. /// @@ -605,12 +290,12 @@ impl AppService { } /// Register a virtual user by sending a [`register::v3::Request`] to the - /// homeserver + /// homeserver. /// /// # Arguments /// /// * `localpart` - The localpart of the user to register. Must be covered - /// by the namespaces in the [`Registration`] in order to succeed. + /// by the namespaces in the registration in order to succeed. /// /// # Returns /// This function may return a UIAA response, which should be checked for @@ -624,7 +309,7 @@ impl AppService { login_type: Some(®ister::LoginType::ApplicationService), }); - let client = self.get_cached_client(None)?; + let client = self.virtual_user(None).await?; client.register(request).await?; self.set_user_registered(localpart.as_ref()).await?; @@ -633,7 +318,7 @@ impl AppService { /// Add the given localpart to the database of registered localparts. async fn set_user_registered(&self, localpart: impl AsRef) -> Result<()> { - let client = self.get_cached_client(None)?; + let client = self.virtual_user(None).await?; client .store() .set_custom_value( @@ -646,7 +331,7 @@ impl AppService { /// Get whether a localpart is listed in the database as registered. async fn is_user_registered(&self, localpart: impl AsRef) -> Result { - let client = self.get_cached_client(None)?; + let client = self.virtual_user(None).await?; let key = [USER_KEY, localpart.as_ref().as_bytes()].concat(); let store = client.store().get_custom_value(&key).await?; let registered = @@ -654,14 +339,12 @@ impl AppService { Ok(registered) } - /// Get the AppService [registration] - /// - /// [registration]: https://matrix.org/docs/spec/application_service/r0.1.2#registration + /// Get the [`AppServiceRegistration`]. pub fn registration(&self) -> &AppServiceRegistration { &self.registration } - /// Compare the given `hs_token` against `registration.hs_token` + /// Compare the given `hs_token` against the registration's `hs_token`. /// /// Returns `true` if the tokens match, `false` otherwise. pub fn compare_hs_token(&self, hs_token: impl AsRef) -> bool { @@ -680,12 +363,16 @@ impl AppService { false } - /// Returns a [`warp::Filter`] to be used as [`warp::serve()`] route + /// Returns a [`warp::Filter`] to be used as [`warp::serve()`] route. /// /// Note that if you handle any of the [application-service-specific /// routes], including the legacy routes, you will break the appservice /// functionality. /// + /// Hint: [`warp::Filter`]s can be converted to an `hyper::Service` using + /// [`warp::service`], which allows using it with tower-compatible + /// frameworks such as axum. + /// /// [application-service-specific routes]: https://spec.matrix.org/unstable/application-service-api/#legacy-routes pub fn warp_filter(&self) -> warp::filters::BoxedFilter<(impl warp::Reply,)> { webserver::warp_filter(self.clone()) @@ -699,7 +386,7 @@ impl AppService { &self, transaction: push_events::v1::IncomingRequest, ) -> Result<()> { - let client = self.get_cached_client(None)?; + let sender_localpart_client = self.virtual_user(None).await?; // Find membership events affecting members in our namespace, and update // membership accordingly @@ -712,7 +399,7 @@ impl AppService { continue; } let localpart = event.state_key().localpart(); - client + sender_localpart_client .store() .set_custom_value( &[USER_MEMBER, event.room_id().as_bytes(), b".", localpart.as_bytes()].concat(), @@ -730,24 +417,27 @@ impl AppService { // Spawn a task for each client that constructs and pushes a sync event let mut tasks: Vec> = Vec::new(); let transaction = Arc::new(transaction); - for virt_client in self.clients.iter() { - let client = client.clone(); - let virt_client = virt_client.clone(); + for virtual_user_client in self.clients.iter() { + let client = sender_localpart_client.clone(); + let virtual_user_client = virtual_user_client.clone(); let transaction = transaction.clone(); - let appserv_uid = self.registration.sender_localpart.clone(); + let sender_localpart = self.registration.sender_localpart.clone(); let task = tokio::spawn(async move { - let user_id = match virt_client.user_id() { + let virtual_user_localpart = match virtual_user_client.user_id() { Some(user_id) => user_id.localpart(), // The client is not logged in, skipping None => return Ok(()), }; let mut response = sync_events::v3::Response::new(transaction.txn_id.to_string()); - // Clients expect events to be grouped per room, where the group also denotes - // what the client's membership of the given room is. We take the - // all the events in the transaction and sort them into appropriate - // groups, falling back to a membership of "join" if it's unknown. + // Clients expect events to be grouped per room, where the + // group also denotes what the client's membership of the given + // room is. We take all the events in the transaction and sort + // them into appropriate groups. + // + // We special-case the `sender_localpart` user which receives all events and + // by falling back to a membership of "join" if it's unknown. for raw_event in &transaction.events { let room_id = match raw_event.deserialize_as::()?.room_id { Some(room_id) => room_id, @@ -756,11 +446,15 @@ impl AppService { continue; } }; - let key = &[USER_MEMBER, room_id.as_bytes(), b".", user_id.as_bytes()].concat(); + let key = + &[USER_MEMBER, room_id.as_bytes(), b".", virtual_user_localpart.as_bytes()] + .concat(); let membership = match client.store().get_custom_value(key).await? { Some(value) => String::from_utf8(value).ok().map(MembershipState::from), - // Assume the appservice is in every known room - None if user_id == appserv_uid => Some(MembershipState::Join), + // Assume the `sender_localpart` user is in every known room + None if virtual_user_localpart == sender_localpart => { + Some(MembershipState::Join) + } None => None, }; @@ -780,10 +474,10 @@ impl AppService { response.rooms.invite.entry(room_id).or_default(); } Some(unknown) => debug!("Unknown membership type: {unknown}"), - None => debug!("Assuming {user_id} is not in {room_id}"), + None => debug!("Assuming {virtual_user_localpart} is not in {room_id}"), } } - virt_client.receive_transaction(&transaction.txn_id, response).await?; + virtual_user_client.receive_transaction(&transaction.txn_id, response).await?; Ok::<_, Error>(()) }); @@ -798,10 +492,10 @@ impl AppService { } /// Convenience method that runs an http server depending on the selected - /// server feature + /// server feature. /// /// This is a blocking call that tries to listen on the provided host and - /// port + /// port. pub async fn run(&self, host: impl Into, port: impl Into) -> Result<()> { let host = host.into(); let port = port.into(); diff --git a/crates/matrix-sdk-appservice/src/registration.rs b/crates/matrix-sdk-appservice/src/registration.rs new file mode 100644 index 000000000..2c581d28f --- /dev/null +++ b/crates/matrix-sdk-appservice/src/registration.rs @@ -0,0 +1,124 @@ +// Copyright 2022 Famedly GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! AppService Registration. + +use std::{convert::TryFrom, fs::File, ops::Deref, path::PathBuf}; + +use http::Uri; +use regex::Regex; +use ruma::api::appservice::Registration; + +use crate::{Error, Result}; + +pub type Host = String; +pub type Port = u16; + +/// AppService Registration +/// +/// Wrapper around [`Registration`]. See also . +#[derive(Debug, Clone)] +pub struct AppServiceRegistration { + inner: Registration, +} + +impl AppServiceRegistration { + /// Try to load registration from yaml string + /// + /// See the fields of [`Registration`] for the required format + pub fn try_from_yaml_str(value: impl AsRef) -> Result { + Ok(Self { inner: serde_yaml::from_str(value.as_ref())? }) + } + + /// Try to load registration from yaml file + /// + /// See the fields of [`Registration`] for the required format + pub fn try_from_yaml_file(path: impl Into) -> Result { + let file = File::open(path.into())?; + + Ok(Self { inner: serde_yaml::from_reader(file)? }) + } + + /// Get the host and port from the registration URL + /// + /// If no port is found it falls back to scheme defaults: 80 for http and + /// 443 for https + pub fn get_host_and_port(&self) -> Result<(Host, Port)> { + let uri = Uri::try_from(&self.inner.url)?; + + let host = uri.host().ok_or(Error::MissingRegistrationHost)?.to_owned(); + let port = match uri.port() { + Some(port) => Ok(port.as_u16()), + None => match uri.scheme_str() { + Some("http") => Ok(80), + Some("https") => Ok(443), + _ => Err(Error::MissingRegistrationPort), + }, + }?; + + Ok((host, port)) + } +} + +impl From for AppServiceRegistration { + fn from(value: Registration) -> Self { + Self { inner: value } + } +} + +impl Deref for AppServiceRegistration { + type Target = Registration; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +/// Cache data for the registration namespaces. +#[derive(Debug, Clone)] +pub struct NamespaceCache { + /// List of user regexes in our namespace + pub(crate) users: Vec, + /// List of alias regexes in our namespace + #[allow(dead_code)] + aliases: Vec, + /// List of room id regexes in our namespace + #[allow(dead_code)] + rooms: Vec, +} + +impl NamespaceCache { + /// Creates a new registration cache from a [`Registration`] value + pub fn from_registration(registration: &Registration) -> Result { + let users = registration + .namespaces + .users + .iter() + .map(|user| Regex::new(&user.regex)) + .collect::, _>>()?; + let aliases = registration + .namespaces + .aliases + .iter() + .map(|user| Regex::new(&user.regex)) + .collect::, _>>()?; + let rooms = registration + .namespaces + .rooms + .iter() + .map(|user| Regex::new(&user.regex)) + .collect::, _>>()?; + Ok(NamespaceCache { users, aliases, rooms }) + } +} diff --git a/crates/matrix-sdk-appservice/src/virtual_user.rs b/crates/matrix-sdk-appservice/src/virtual_user.rs new file mode 100644 index 000000000..ec49e61c9 --- /dev/null +++ b/crates/matrix-sdk-appservice/src/virtual_user.rs @@ -0,0 +1,149 @@ +// Copyright 2022 Famedly GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Virtual users. + +use matrix_sdk::{config::RequestConfig, Client, ClientBuildError, ClientBuilder, Session}; +use ruma::{ + api::client::{session::login, uiaa::UserIdentifier}, + assign, DeviceId, OwnedDeviceId, UserId, +}; +use tracing::warn; + +use crate::{AppService, Result}; + +/// Builder for a virtual user +#[derive(Debug)] +pub struct VirtualUserBuilder<'a> { + appservice: &'a AppService, + localpart: &'a str, + device_id: Option, + client_builder: ClientBuilder, + log_in: bool, + restored_session: Option, +} + +impl<'a> VirtualUserBuilder<'a> { + /// Create a new virtual user builder + /// # Arguments + /// + /// * `localpart` - The localpart of the virtual user + pub fn new(appservice: &'a AppService, localpart: &'a str) -> Self { + Self { + appservice, + localpart, + device_id: None, + client_builder: Client::builder(), + log_in: false, + restored_session: None, + } + } + + /// Set the device ID of the virtual user + pub fn device_id(mut self, device_id: Option) -> Self { + self.device_id = device_id; + self + } + + /// Sets the client builder to use for the virtual user + pub fn client_builder(mut self, client_builder: ClientBuilder) -> Self { + self.client_builder = client_builder; + self + } + + /// Log in as the virtual user + /// + /// In some cases it is necessary to log in as the virtual user, such as to + /// upload device keys + pub fn login(mut self) -> Self { + self.log_in = true; + self + } + + /// Restore a persisted session + /// + /// This is primarily useful if you enable + /// [`VirtualUserBuilder::login()`] and want to restore a session + /// from a previous run. + pub fn restored_session(mut self, session: Session) -> Self { + self.restored_session = Some(session); + self + } + + /// Build the virtual user + /// + /// # Errors + /// This function returns an error if an invalid localpart is provided. + pub async fn build(self) -> Result { + if let Some(client) = self.appservice.clients.get(self.localpart) { + return Ok(client.clone()); + } + + let user_id = UserId::parse_with_server_name(self.localpart, &self.appservice.server_name)?; + if !(self.appservice.user_id_is_in_namespace(&user_id) + || self.localpart == self.appservice.registration.sender_localpart) + { + warn!("Virtual client id '{user_id}' is not in the namespace") + } + + let mut builder = self.client_builder; + + if !self.log_in && self.localpart != self.appservice.registration.sender_localpart { + builder = builder.assert_identity(); + } + + let client = builder + .homeserver_url(self.appservice.homeserver_url.clone()) + .appservice_mode() + .build() + .await + .map_err(ClientBuildError::assert_valid_builder_args)?; + + let session = if let Some(session) = self.restored_session { + session + } else if self.log_in && self.localpart != self.appservice.registration.sender_localpart { + let login_info = + login::v3::LoginInfo::ApplicationService(login::v3::ApplicationService::new( + UserIdentifier::UserIdOrLocalpart(self.localpart), + )); + + let request = assign!(login::v3::Request::new(login_info), { + device_id: self.device_id.as_ref().map(|v| v.as_ref()), + initial_device_display_name: None, + }); + + let response = + client.send(request, Some(RequestConfig::short_retry().force_auth())).await?; + + Session { + access_token: response.access_token, + user_id: response.user_id, + device_id: response.device_id, + } + } else { + // Don’t log in + Session { + access_token: self.appservice.registration.as_token.clone(), + user_id: user_id.clone(), + device_id: self.device_id.unwrap_or_else(DeviceId::new), + } + }; + + client.restore_login(session).await?; + + self.appservice.clients.insert(self.localpart.to_owned(), client.clone()); + + Ok(client) + } +} diff --git a/crates/matrix-sdk-appservice/tests/tests.rs b/crates/matrix-sdk-appservice/tests/tests.rs index 61180ed97..fc74555a5 100644 --- a/crates/matrix-sdk-appservice/tests/tests.rs +++ b/crates/matrix-sdk-appservice/tests/tests.rs @@ -125,6 +125,8 @@ async fn test_put_transaction_with_repeating_txn_id() -> Result<()> { #[allow(clippy::mutex_atomic)] let on_state_member = Arc::new(Mutex::new(false)); appservice + .virtual_user(None) + .await? .register_event_handler({ let on_state_member = on_state_member.clone(); move |_ev: OriginalSyncRoomMemberEvent| { @@ -132,7 +134,7 @@ async fn test_put_transaction_with_repeating_txn_id() -> Result<()> { future::ready(()) } }) - .await?; + .await; let status = warp::test::request() .method("PUT") @@ -279,6 +281,8 @@ async fn test_event_handler() -> Result<()> { #[allow(clippy::mutex_atomic)] let on_state_member = Arc::new(Mutex::new(false)); appservice + .virtual_user(None) + .await? .register_event_handler({ let on_state_member = on_state_member.clone(); move |_ev: OriginalSyncRoomMemberEvent| { @@ -286,7 +290,7 @@ async fn test_event_handler() -> Result<()> { future::ready(()) } }) - .await?; + .await; let uri = "/_matrix/app/v1/transactions/1?access_token=hs_token"; @@ -365,7 +369,8 @@ async fn test_appservice_on_sub_path() -> Result<()> { }; let members = appservice - .get_cached_client(None)? + .virtual_user(None) + .await? .get_room(room_id) .expect("Expected room to be available") .members_no_sync() @@ -455,8 +460,8 @@ async fn test_receive_transaction() -> Result<()> { ]; let appservice = appservice(None, None).await?; - let alice = appservice.virtual_user_client("_appservice_alice").await?; - let bob = appservice.virtual_user_client("_appservice_bob").await?; + let alice = appservice.virtual_user(Some("_appservice_alice")).await?; + let bob = appservice.virtual_user(Some("_appservice_bob")).await?; appservice .receive_transaction(push_events::v1::IncomingRequest::new("dontcare".into(), json)) .await?;