mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-04-26 10:11:10 -04:00
refactor(appservice)!: Improve API and cleanup docs
This commit is contained in:
@@ -31,7 +31,7 @@ pub async fn handle_room_member(
|
||||
error_if_user_not_in_use(error)?;
|
||||
}
|
||||
|
||||
let client = appservice.virtual_user_client(user_id.localpart()).await?;
|
||||
let client = appservice.virtual_user(Some(user_id.localpart())).await?;
|
||||
client.join_room_by_id(room.room_id()).await?;
|
||||
}
|
||||
|
||||
@@ -61,7 +61,9 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let appservice = AppService::new(homeserver_url, server_name, registration).await?;
|
||||
appservice.register_user_query(Box::new(|_, _| Box::pin(async { true }))).await;
|
||||
appservice
|
||||
.register_event_handler_context(appservice.clone())?
|
||||
.virtual_user(None)
|
||||
.await?
|
||||
.register_event_handler_context(appservice.clone())
|
||||
.register_event_handler(
|
||||
move |event: OriginalSyncRoomMemberEvent,
|
||||
room: Room,
|
||||
@@ -69,7 +71,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
handle_room_member(appservice, room, event)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
.await;
|
||||
|
||||
let (host, port) = appservice.registration().get_host_and_port()?;
|
||||
appservice.run(host, port).await?;
|
||||
|
||||
@@ -1,3 +1,17 @@
|
||||
// Copyright 2022 Famedly GmbH
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
|
||||
use matrix_sdk::locks::Mutex;
|
||||
|
||||
@@ -17,12 +17,13 @@
|
||||
//! The appservice crate aims to provide a batteries-included experience by
|
||||
//! being a thin wrapper around the [`matrix_sdk`]. That means that we
|
||||
//!
|
||||
//! * ship with functionality to configure your webserver crate or simply run
|
||||
//! the webserver for you
|
||||
//! * receive and validate requests from the homeserver correctly
|
||||
//! * allow calling the homeserver with proper virtual user identity assertion
|
||||
//! * have consistent room state by leveraging matrix-sdk's state store
|
||||
//! * provide E2EE support by leveraging matrix-sdk's crypto store
|
||||
//! - [x] ship with functionality to configure your webserver crate or simply
|
||||
//! run the webserver for you
|
||||
//! - [x] receive and validate requests from the homeserver correctly
|
||||
//! - [x] allow calling the homeserver with proper virtual user identity
|
||||
//! assertion
|
||||
//! - [x] have consistent room state by leveraging matrix-sdk's state store
|
||||
//! - [ ] provide E2EE support by leveraging matrix-sdk's crypto store
|
||||
//!
|
||||
//! # Status
|
||||
//!
|
||||
@@ -31,10 +32,10 @@
|
||||
//!
|
||||
//! # Registration
|
||||
//!
|
||||
//! The crate relies on the registration being always in sync with the actual
|
||||
//! registration used by the homeserver. That's because it's required for the
|
||||
//! access tokens and because membership states for virtual users are determined
|
||||
//! based on the registered namespace.
|
||||
//! The crate relies on the appservice registration being always in sync with
|
||||
//! the actual registration used by the homeserver. That's because it's required
|
||||
//! for the access tokens and because membership states for virtual users are
|
||||
//! determined based on the registered namespaces.
|
||||
//!
|
||||
//! **Note:** Non-exclusive registration namespaces are not yet supported and
|
||||
//! hence might lead to undefined behavior.
|
||||
@@ -65,9 +66,13 @@
|
||||
//! ")?;
|
||||
//!
|
||||
//! let mut appservice = AppService::new(homeserver_url, server_name, registration).await?;
|
||||
//! appservice.register_event_handler(|_ev: SyncRoomMemberEvent| async {
|
||||
//! // do stuff
|
||||
//! });
|
||||
//! appservice
|
||||
//! .virtual_user(None)
|
||||
//! .await?
|
||||
//! .register_event_handler(|_ev: SyncRoomMemberEvent| async {
|
||||
//! // do stuff
|
||||
//! })
|
||||
//! .await;
|
||||
//!
|
||||
//! let (host, port) = appservice.registration().get_host_and_port()?;
|
||||
//! appservice.run(host, port).await?;
|
||||
@@ -80,285 +85,50 @@
|
||||
//!
|
||||
//! [Application Service]: https://matrix.org/docs/spec/application_service/r0.1.2
|
||||
//! [matrix-org/matrix-rust-sdk#228]: https://github.com/matrix-org/matrix-rust-sdk/issues/228
|
||||
//! [examples directory]: https://github.com/matrix-org/matrix-rust-sdk/tree/main/crates/matrix-sdk-appservice/examples
|
||||
//! [examples directory]: https://github.com/matrix-org/matrix-rust-sdk/tree/master/matrix_sdk_appservice/examples
|
||||
|
||||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
fs::File,
|
||||
future::Future,
|
||||
ops::Deref,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
use std::{convert::TryInto, sync::Arc};
|
||||
|
||||
use dashmap::DashMap;
|
||||
pub use error::Error;
|
||||
use event_handler::AppserviceFn;
|
||||
use http::Uri;
|
||||
pub use matrix_sdk;
|
||||
#[doc(no_inline)]
|
||||
pub use matrix_sdk::ruma;
|
||||
use matrix_sdk::{
|
||||
bytes::Bytes,
|
||||
config::RequestConfig,
|
||||
event_handler::{EventHandler, EventHandlerResult, SyncEvent},
|
||||
reqwest::Url,
|
||||
Client, ClientBuildError, ClientBuilder, Session,
|
||||
};
|
||||
use regex::Regex;
|
||||
use matrix_sdk::{bytes::Bytes, reqwest::Url, Client, ClientBuilder};
|
||||
use ruma::{
|
||||
api::{
|
||||
appservice::{
|
||||
event::push_events,
|
||||
query::{query_room_alias::v1 as query_room, query_user_id::v1 as query_user},
|
||||
Registration,
|
||||
},
|
||||
client::{account::register, session::login, sync::sync_events, uiaa::UserIdentifier},
|
||||
client::{account::register, sync::sync_events},
|
||||
},
|
||||
assign,
|
||||
events::{room::member::MembershipState, AnyRoomEvent, AnyStateEvent},
|
||||
DeviceId, IdParseError, OwnedDeviceId, OwnedRoomId, OwnedServerName, UserId,
|
||||
IdParseError, OwnedRoomId, OwnedServerName,
|
||||
};
|
||||
use serde::{de::DeserializeOwned, Deserialize};
|
||||
use serde::Deserialize;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
mod error;
|
||||
pub mod event_handler;
|
||||
pub mod registration;
|
||||
pub mod virtual_user;
|
||||
mod webserver;
|
||||
|
||||
pub use registration::AppServiceRegistration;
|
||||
use registration::NamespaceCache;
|
||||
pub use virtual_user::VirtualUserBuilder;
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
pub type Host = String;
|
||||
pub type Port = u16;
|
||||
|
||||
const USER_KEY: &[u8] = b"appservice.users.";
|
||||
pub const USER_MEMBER: &[u8] = b"appservice.users.membership.";
|
||||
|
||||
/// Builder for a virtual user
|
||||
#[derive(Debug)]
|
||||
pub struct VirtualUserBuilder<'a> {
|
||||
appservice: &'a AppService,
|
||||
localpart: &'a str,
|
||||
device_id: Option<OwnedDeviceId>,
|
||||
client_builder: ClientBuilder,
|
||||
log_in: bool,
|
||||
restored_session: Option<Session>,
|
||||
}
|
||||
|
||||
impl<'a> VirtualUserBuilder<'a> {
|
||||
/// Create a new virtual user builder
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `localpart` - The localpart of the virtual user
|
||||
pub fn new(appservice: &'a AppService, localpart: &'a str) -> Self {
|
||||
Self {
|
||||
appservice,
|
||||
localpart,
|
||||
device_id: None,
|
||||
client_builder: Client::builder(),
|
||||
log_in: false,
|
||||
restored_session: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the device ID of the virtual user
|
||||
pub fn device_id(mut self, device_id: Option<OwnedDeviceId>) -> Self {
|
||||
self.device_id = device_id;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the client builder to use for the virtual user
|
||||
pub fn client_builder(mut self, client_builder: ClientBuilder) -> Self {
|
||||
self.client_builder = client_builder;
|
||||
self
|
||||
}
|
||||
|
||||
/// Log in as the virtual user
|
||||
///
|
||||
/// In some cases it is necessary to log in as the virtual user, such as to
|
||||
/// upload device keys
|
||||
pub fn login(mut self) -> Self {
|
||||
self.log_in = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Restore a persisted session
|
||||
///
|
||||
/// This is primarily useful if you enable
|
||||
/// [`VirtualUserBuilder::login()`] and want to restore a session
|
||||
/// from a previous run.
|
||||
pub fn restored_session(mut self, session: Session) -> Self {
|
||||
self.restored_session = Some(session);
|
||||
self
|
||||
}
|
||||
|
||||
/// Build the virtual user
|
||||
///
|
||||
/// # Errors
|
||||
/// This function returns an error if an invalid localpart is provided.
|
||||
pub async fn build(self) -> Result<Client> {
|
||||
if let Some(client) = self.appservice.clients.get(self.localpart) {
|
||||
return Ok(client.clone());
|
||||
}
|
||||
|
||||
let user_id = UserId::parse_with_server_name(self.localpart, &self.appservice.server_name)?;
|
||||
if !(self.appservice.user_id_is_in_namespace(&user_id)
|
||||
|| self.localpart == self.appservice.registration.sender_localpart)
|
||||
{
|
||||
warn!("Virtual client id '{user_id}' is not in the namespace")
|
||||
}
|
||||
|
||||
let mut builder = self.client_builder;
|
||||
|
||||
if !self.log_in && self.localpart != self.appservice.registration.sender_localpart {
|
||||
builder = builder.assert_identity();
|
||||
}
|
||||
|
||||
let client = builder
|
||||
.homeserver_url(self.appservice.homeserver_url.clone())
|
||||
.appservice_mode()
|
||||
.build()
|
||||
.await
|
||||
.map_err(ClientBuildError::assert_valid_builder_args)?;
|
||||
|
||||
let session = if let Some(session) = self.restored_session {
|
||||
session
|
||||
} else if self.log_in && self.localpart != self.appservice.registration.sender_localpart {
|
||||
self.appservice
|
||||
.create_session(self.localpart, self.device_id.as_ref().map(|v| v.as_ref()), None)
|
||||
.await?
|
||||
} else {
|
||||
// Don’t log in
|
||||
Session {
|
||||
access_token: self.appservice.registration.as_token.clone(),
|
||||
user_id: user_id.clone(),
|
||||
device_id: self.device_id.unwrap_or_else(DeviceId::new),
|
||||
}
|
||||
};
|
||||
|
||||
client.restore_login(session).await?;
|
||||
|
||||
self.appservice.clients.insert(self.localpart.to_owned(), client.clone());
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
}
|
||||
|
||||
/// AppService Registration
|
||||
///
|
||||
/// Wrapper around [`Registration`]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AppServiceRegistration {
|
||||
inner: Registration,
|
||||
}
|
||||
|
||||
impl AppServiceRegistration {
|
||||
/// Try to load registration from yaml string
|
||||
///
|
||||
/// See the fields of [`Registration`] for the required format
|
||||
pub fn try_from_yaml_str(value: impl AsRef<str>) -> Result<Self> {
|
||||
Ok(Self { inner: serde_yaml::from_str(value.as_ref())? })
|
||||
}
|
||||
|
||||
/// Try to load registration from yaml file
|
||||
///
|
||||
/// See the fields of [`Registration`] for the required format
|
||||
pub fn try_from_yaml_file(path: impl Into<PathBuf>) -> Result<Self> {
|
||||
let file = File::open(path.into())?;
|
||||
|
||||
Ok(Self { inner: serde_yaml::from_reader(file)? })
|
||||
}
|
||||
|
||||
/// Get the host and port from the registration URL
|
||||
///
|
||||
/// If no port is found it falls back to scheme defaults: 80 for http and
|
||||
/// 443 for https
|
||||
pub fn get_host_and_port(&self) -> Result<(Host, Port)> {
|
||||
let uri = Uri::try_from(&self.inner.url)?;
|
||||
|
||||
let host = uri.host().ok_or(Error::MissingRegistrationHost)?.to_owned();
|
||||
let port = match uri.port() {
|
||||
Some(port) => Ok(port.as_u16()),
|
||||
None => match uri.scheme_str() {
|
||||
Some("http") => Ok(80),
|
||||
Some("https") => Ok(443),
|
||||
_ => Err(Error::MissingRegistrationPort),
|
||||
},
|
||||
}?;
|
||||
|
||||
Ok((host, port))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Registration> for AppServiceRegistration {
|
||||
fn from(value: Registration) -> Self {
|
||||
Self { inner: value }
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for AppServiceRegistration {
|
||||
type Target = Registration;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache data for the registration namespaces.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NamespaceCache {
|
||||
/// List of user regexes in our namespace
|
||||
users: Vec<Regex>,
|
||||
/// List of alias regexes in our namespace
|
||||
#[allow(dead_code)]
|
||||
aliases: Vec<Regex>,
|
||||
/// List of room id regexes in our namespace
|
||||
#[allow(dead_code)]
|
||||
rooms: Vec<Regex>,
|
||||
}
|
||||
|
||||
impl NamespaceCache {
|
||||
/// Creates a new registration cache from a [`Registration`] value
|
||||
pub fn from_registration(registration: &Registration) -> Result<Self> {
|
||||
let users = registration
|
||||
.namespaces
|
||||
.users
|
||||
.iter()
|
||||
.map(|user| Regex::new(&user.regex))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let aliases = registration
|
||||
.namespaces
|
||||
.aliases
|
||||
.iter()
|
||||
.map(|user| Regex::new(&user.regex))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let rooms = registration
|
||||
.namespaces
|
||||
.rooms
|
||||
.iter()
|
||||
.map(|user| Regex::new(&user.regex))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
Ok(NamespaceCache { users, aliases, rooms })
|
||||
}
|
||||
}
|
||||
|
||||
type Localpart = String;
|
||||
|
||||
/// The `localpart` of the user associated with the application service via
|
||||
/// `sender_localpart` in [`AppServiceRegistration`].
|
||||
///
|
||||
/// Dummy type for shared documentation
|
||||
#[allow(dead_code)]
|
||||
pub type MainUser = ();
|
||||
|
||||
/// The application service may specify the virtual user to act as through use
|
||||
/// of a user_id query string parameter on the request. The user specified in
|
||||
/// the query string must be covered by one of the [`AppServiceRegistration`]'s
|
||||
/// `users` namespaces.
|
||||
///
|
||||
/// Dummy type for shared documentation
|
||||
pub type VirtualUser = ();
|
||||
|
||||
/// AppService
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AppService {
|
||||
@@ -371,11 +141,13 @@ pub struct AppService {
|
||||
}
|
||||
|
||||
impl AppService {
|
||||
/// Create new AppService
|
||||
/// Create a new AppService.
|
||||
///
|
||||
/// Also creates and caches a [`Client`] for the [`MainUser`].
|
||||
/// A default [`ClientBuilder`] is used, if you want to customize it
|
||||
/// use [`with_client_builder()`][Self::with_client_builder] instead.
|
||||
/// This will also construct a [`virtual_user()`][Self::virtual_user] for
|
||||
/// the `sender_localpart` of the given registration. This virtual user can
|
||||
/// be used to register an event handler for all incoming events. Other
|
||||
/// virtual users only receive events if they're known to be a member of a
|
||||
/// room.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
@@ -399,7 +171,7 @@ impl AppService {
|
||||
}
|
||||
|
||||
/// Same as [`new()`][Self::new] but lets you provide a [`ClientBuilder`]
|
||||
/// for the [`Client`]
|
||||
/// for the virtual user that gets constructed for the `sender_localpart`.
|
||||
pub async fn with_client_builder(
|
||||
homeserver_url: impl TryInto<Url, Error = url::ParseError>,
|
||||
server_name: impl TryInto<OwnedServerName, Error = IdParseError>,
|
||||
@@ -423,143 +195,56 @@ impl AppService {
|
||||
event_handler,
|
||||
};
|
||||
|
||||
// we create and cache the [`MainUser`] by default
|
||||
appservice.virtual_user_builder(&sender_localpart).client_builder(builder).build().await?;
|
||||
|
||||
Ok(appservice)
|
||||
}
|
||||
|
||||
/// Create a [`Client`] for the given [`VirtualUser`]'s `localpart`
|
||||
/// Create a virtual user client.
|
||||
///
|
||||
/// Will create and return a [`Client`] that's configured to [assert the
|
||||
/// identity] on all outgoing homeserver requests if `localpart` is
|
||||
/// given.
|
||||
/// Will create and return a client that's configured to [assert the
|
||||
/// identity] on outgoing homeserver requests that need authentication.
|
||||
///
|
||||
/// This method is a singleton that saves the client internally for re-use
|
||||
/// based on the `localpart`. The cached [`Client`] can be retrieved either
|
||||
/// by calling this method again or by calling
|
||||
/// [`get_cached_client()`][Self::get_cached_client] which is non-async
|
||||
/// convenience wrapper.
|
||||
/// based on the `localpart`. The cached client can be retrieved by calling
|
||||
/// this method again.
|
||||
///
|
||||
/// Note that if you want to do actions like joining rooms with a virtual
|
||||
/// user it needs to be registered first. `Self::register_virtual_user()`
|
||||
/// can be used for that purpose.
|
||||
/// user it needs to be registered first.
|
||||
/// [`register_virtual_user()`][Self::register_virtual_user] can be used
|
||||
/// for that purpose.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `localpart` - The localpart of the user we want assert our identity to
|
||||
/// * `localpart` - Used for constructing the accordingly. If `None` is
|
||||
/// given it uses the `sender_localpart` from the registration.
|
||||
///
|
||||
/// [registration]: https://matrix.org/docs/spec/application_service/r0.1.2#registration
|
||||
/// [assert the identity]: https://matrix.org/docs/spec/application_service/r0.1.2#identity-assertion
|
||||
pub async fn virtual_user_client(&self, localpart: impl AsRef<str>) -> Result<Client> {
|
||||
self.virtual_user_builder(localpart.as_ref()).build().await
|
||||
pub async fn virtual_user(&self, localpart: Option<&str>) -> Result<Client> {
|
||||
let localpart = localpart.unwrap_or_else(|| self.registration.sender_localpart.as_ref());
|
||||
self.virtual_user_builder(localpart).build().await
|
||||
}
|
||||
|
||||
/// Same as [`virtual_user_client()`][Self::virtual_user_client] but with
|
||||
/// the ability to pass in a [`ClientBuilder`]
|
||||
/// Same as [`virtual_user()`][Self::virtual_user] but with
|
||||
/// the ability to pass in a [`ClientBuilder`].
|
||||
///
|
||||
/// Since this method is a singleton follow-up calls with different
|
||||
/// [`ClientBuilder`]s will be ignored.
|
||||
pub async fn virtual_user_client_with_client_builder(
|
||||
pub async fn virtual_user_with_client_builder(
|
||||
&self,
|
||||
localpart: impl AsRef<str>,
|
||||
localpart: Option<&str>,
|
||||
builder: ClientBuilder,
|
||||
) -> Result<Client> {
|
||||
self.virtual_user_builder(localpart.as_ref()).client_builder(builder).build().await
|
||||
let localpart = localpart.unwrap_or_else(|| self.registration.sender_localpart.as_ref());
|
||||
self.virtual_user_builder(localpart).client_builder(builder).build().await
|
||||
}
|
||||
|
||||
/// Create a new virtual user builder for the given `localpart`.
|
||||
pub fn virtual_user_builder<'a>(&'a self, localpart: &'a str) -> VirtualUserBuilder<'a> {
|
||||
VirtualUserBuilder::new(self, localpart)
|
||||
}
|
||||
|
||||
/// Create a session using appservice login for a virtual user.
|
||||
async fn create_session(
|
||||
&self,
|
||||
user: impl AsRef<str>,
|
||||
device_id: Option<&str>,
|
||||
initial_device_display_name: Option<&str>,
|
||||
) -> Result<Session> {
|
||||
let homeserver = self.homeserver_url.clone();
|
||||
info!(homeserver = homeserver.as_str(), user = user.as_ref(), "Logging in as virtual user");
|
||||
|
||||
let login_info = login::v3::LoginInfo::ApplicationService(
|
||||
login::v3::ApplicationService::new(UserIdentifier::UserIdOrLocalpart(user.as_ref())),
|
||||
);
|
||||
|
||||
let request = assign!(login::v3::Request::new(login_info), {
|
||||
device_id: device_id.map(|d| d.into()),
|
||||
initial_device_display_name
|
||||
});
|
||||
|
||||
let response = self
|
||||
.get_cached_client(None)?
|
||||
.send(request, Some(RequestConfig::short_retry().force_auth()))
|
||||
.await?;
|
||||
|
||||
Ok(Session {
|
||||
access_token: response.access_token,
|
||||
user_id: response.user_id,
|
||||
device_id: response.device_id,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get cached [`Client`]
|
||||
///
|
||||
/// Will return the client for the given `localpart` if previously
|
||||
/// constructed with [`virtual_user_client()`][Self::virtual_user_client] or
|
||||
/// [`virtual_user_client_with_config()`][Self::
|
||||
/// virtual_user_client_with_client_builder].
|
||||
///
|
||||
/// If no `localpart` is given it assumes the [`MainUser`]'s `localpart`. If
|
||||
/// no client for `localpart` is found it will return an Error.
|
||||
pub fn get_cached_client(&self, localpart: Option<&str>) -> Result<Client> {
|
||||
let localpart = localpart.unwrap_or_else(|| self.registration.sender_localpart.as_ref());
|
||||
|
||||
let entry = self.clients.get(localpart).ok_or(Error::NoClientForLocalpart)?;
|
||||
|
||||
Ok(entry.value().clone())
|
||||
}
|
||||
|
||||
/// Convenience wrapper around [`Client::register_event_handler()`] that
|
||||
/// attaches the event handler to the [`MainUser`]'s [`Client`]
|
||||
///
|
||||
/// Note that the event handler in the [`AppService`] context only triggers
|
||||
/// [`join` room `timeline` events], so no state events or events from the
|
||||
/// `invite`, `knock` or `leave` scope. The rationale behind that is
|
||||
/// that incoming AppService transactions from the homeserver are not
|
||||
/// necessarily bound to a specific user but can cover a multitude of
|
||||
/// namespaces, and as such the AppService basically only "observes
|
||||
/// joined rooms". Also currently homeservers only push PDUs to appservices,
|
||||
/// no EDUs. There's the open [MSC2409] regarding supporting EDUs in the
|
||||
/// future, though it seems to be planned to put EDUs into a different
|
||||
/// JSON key than `events` to stay backwards compatible.
|
||||
///
|
||||
/// [`join` room `timeline` events]: https://spec.matrix.org/unstable/client-server-api/#get_matrixclientr0sync
|
||||
/// [MSC2409]: https://github.com/matrix-org/matrix-doc/pull/2409
|
||||
pub async fn register_event_handler<Ev, Ctx, H>(&self, handler: H) -> Result<&Self>
|
||||
where
|
||||
Ev: SyncEvent + DeserializeOwned + Send + 'static,
|
||||
H: EventHandler<Ev, Ctx>,
|
||||
<H::Future as Future>::Output: EventHandlerResult,
|
||||
{
|
||||
let client = self.get_cached_client(None)?;
|
||||
client.register_event_handler(handler).await;
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Convenience wrapper around [`Client::register_event_handler_context`]
|
||||
/// attaches the event handler context to the [`MainUser`]'s [`Client`].
|
||||
pub fn register_event_handler_context<T>(&self, ctx: T) -> Result<&Self>
|
||||
where
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
let client = self.get_cached_client(None)?;
|
||||
client.register_event_handler_context(ctx);
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Register a responder for queries about the existence of a user with a
|
||||
/// given mxid.
|
||||
///
|
||||
@@ -605,12 +290,12 @@ impl AppService {
|
||||
}
|
||||
|
||||
/// Register a virtual user by sending a [`register::v3::Request`] to the
|
||||
/// homeserver
|
||||
/// homeserver.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `localpart` - The localpart of the user to register. Must be covered
|
||||
/// by the namespaces in the [`Registration`] in order to succeed.
|
||||
/// by the namespaces in the registration in order to succeed.
|
||||
///
|
||||
/// # Returns
|
||||
/// This function may return a UIAA response, which should be checked for
|
||||
@@ -624,7 +309,7 @@ impl AppService {
|
||||
login_type: Some(®ister::LoginType::ApplicationService),
|
||||
});
|
||||
|
||||
let client = self.get_cached_client(None)?;
|
||||
let client = self.virtual_user(None).await?;
|
||||
client.register(request).await?;
|
||||
self.set_user_registered(localpart.as_ref()).await?;
|
||||
|
||||
@@ -633,7 +318,7 @@ impl AppService {
|
||||
|
||||
/// Add the given localpart to the database of registered localparts.
|
||||
async fn set_user_registered(&self, localpart: impl AsRef<str>) -> Result<()> {
|
||||
let client = self.get_cached_client(None)?;
|
||||
let client = self.virtual_user(None).await?;
|
||||
client
|
||||
.store()
|
||||
.set_custom_value(
|
||||
@@ -646,7 +331,7 @@ impl AppService {
|
||||
|
||||
/// Get whether a localpart is listed in the database as registered.
|
||||
async fn is_user_registered(&self, localpart: impl AsRef<str>) -> Result<bool> {
|
||||
let client = self.get_cached_client(None)?;
|
||||
let client = self.virtual_user(None).await?;
|
||||
let key = [USER_KEY, localpart.as_ref().as_bytes()].concat();
|
||||
let store = client.store().get_custom_value(&key).await?;
|
||||
let registered =
|
||||
@@ -654,14 +339,12 @@ impl AppService {
|
||||
Ok(registered)
|
||||
}
|
||||
|
||||
/// Get the AppService [registration]
|
||||
///
|
||||
/// [registration]: https://matrix.org/docs/spec/application_service/r0.1.2#registration
|
||||
/// Get the [`AppServiceRegistration`].
|
||||
pub fn registration(&self) -> &AppServiceRegistration {
|
||||
&self.registration
|
||||
}
|
||||
|
||||
/// Compare the given `hs_token` against `registration.hs_token`
|
||||
/// Compare the given `hs_token` against the registration's `hs_token`.
|
||||
///
|
||||
/// Returns `true` if the tokens match, `false` otherwise.
|
||||
pub fn compare_hs_token(&self, hs_token: impl AsRef<str>) -> bool {
|
||||
@@ -680,12 +363,16 @@ impl AppService {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns a [`warp::Filter`] to be used as [`warp::serve()`] route
|
||||
/// Returns a [`warp::Filter`] to be used as [`warp::serve()`] route.
|
||||
///
|
||||
/// Note that if you handle any of the [application-service-specific
|
||||
/// routes], including the legacy routes, you will break the appservice
|
||||
/// functionality.
|
||||
///
|
||||
/// Hint: [`warp::Filter`]s can be converted to an `hyper::Service` using
|
||||
/// [`warp::service`], which allows using it with tower-compatible
|
||||
/// frameworks such as axum.
|
||||
///
|
||||
/// [application-service-specific routes]: https://spec.matrix.org/unstable/application-service-api/#legacy-routes
|
||||
pub fn warp_filter(&self) -> warp::filters::BoxedFilter<(impl warp::Reply,)> {
|
||||
webserver::warp_filter(self.clone())
|
||||
@@ -699,7 +386,7 @@ impl AppService {
|
||||
&self,
|
||||
transaction: push_events::v1::IncomingRequest,
|
||||
) -> Result<()> {
|
||||
let client = self.get_cached_client(None)?;
|
||||
let sender_localpart_client = self.virtual_user(None).await?;
|
||||
|
||||
// Find membership events affecting members in our namespace, and update
|
||||
// membership accordingly
|
||||
@@ -712,7 +399,7 @@ impl AppService {
|
||||
continue;
|
||||
}
|
||||
let localpart = event.state_key().localpart();
|
||||
client
|
||||
sender_localpart_client
|
||||
.store()
|
||||
.set_custom_value(
|
||||
&[USER_MEMBER, event.room_id().as_bytes(), b".", localpart.as_bytes()].concat(),
|
||||
@@ -730,24 +417,27 @@ impl AppService {
|
||||
// Spawn a task for each client that constructs and pushes a sync event
|
||||
let mut tasks: Vec<JoinHandle<_>> = Vec::new();
|
||||
let transaction = Arc::new(transaction);
|
||||
for virt_client in self.clients.iter() {
|
||||
let client = client.clone();
|
||||
let virt_client = virt_client.clone();
|
||||
for virtual_user_client in self.clients.iter() {
|
||||
let client = sender_localpart_client.clone();
|
||||
let virtual_user_client = virtual_user_client.clone();
|
||||
let transaction = transaction.clone();
|
||||
let appserv_uid = self.registration.sender_localpart.clone();
|
||||
let sender_localpart = self.registration.sender_localpart.clone();
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
let user_id = match virt_client.user_id() {
|
||||
let virtual_user_localpart = match virtual_user_client.user_id() {
|
||||
Some(user_id) => user_id.localpart(),
|
||||
// The client is not logged in, skipping
|
||||
None => return Ok(()),
|
||||
};
|
||||
let mut response = sync_events::v3::Response::new(transaction.txn_id.to_string());
|
||||
|
||||
// Clients expect events to be grouped per room, where the group also denotes
|
||||
// what the client's membership of the given room is. We take the
|
||||
// all the events in the transaction and sort them into appropriate
|
||||
// groups, falling back to a membership of "join" if it's unknown.
|
||||
// Clients expect events to be grouped per room, where the
|
||||
// group also denotes what the client's membership of the given
|
||||
// room is. We take all the events in the transaction and sort
|
||||
// them into appropriate groups.
|
||||
//
|
||||
// We special-case the `sender_localpart` user which receives all events and
|
||||
// by falling back to a membership of "join" if it's unknown.
|
||||
for raw_event in &transaction.events {
|
||||
let room_id = match raw_event.deserialize_as::<EventRoomId>()?.room_id {
|
||||
Some(room_id) => room_id,
|
||||
@@ -756,11 +446,15 @@ impl AppService {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let key = &[USER_MEMBER, room_id.as_bytes(), b".", user_id.as_bytes()].concat();
|
||||
let key =
|
||||
&[USER_MEMBER, room_id.as_bytes(), b".", virtual_user_localpart.as_bytes()]
|
||||
.concat();
|
||||
let membership = match client.store().get_custom_value(key).await? {
|
||||
Some(value) => String::from_utf8(value).ok().map(MembershipState::from),
|
||||
// Assume the appservice is in every known room
|
||||
None if user_id == appserv_uid => Some(MembershipState::Join),
|
||||
// Assume the `sender_localpart` user is in every known room
|
||||
None if virtual_user_localpart == sender_localpart => {
|
||||
Some(MembershipState::Join)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
@@ -780,10 +474,10 @@ impl AppService {
|
||||
response.rooms.invite.entry(room_id).or_default();
|
||||
}
|
||||
Some(unknown) => debug!("Unknown membership type: {unknown}"),
|
||||
None => debug!("Assuming {user_id} is not in {room_id}"),
|
||||
None => debug!("Assuming {virtual_user_localpart} is not in {room_id}"),
|
||||
}
|
||||
}
|
||||
virt_client.receive_transaction(&transaction.txn_id, response).await?;
|
||||
virtual_user_client.receive_transaction(&transaction.txn_id, response).await?;
|
||||
Ok::<_, Error>(())
|
||||
});
|
||||
|
||||
@@ -798,10 +492,10 @@ impl AppService {
|
||||
}
|
||||
|
||||
/// Convenience method that runs an http server depending on the selected
|
||||
/// server feature
|
||||
/// server feature.
|
||||
///
|
||||
/// This is a blocking call that tries to listen on the provided host and
|
||||
/// port
|
||||
/// port.
|
||||
pub async fn run(&self, host: impl Into<String>, port: impl Into<u16>) -> Result<()> {
|
||||
let host = host.into();
|
||||
let port = port.into();
|
||||
|
||||
124
crates/matrix-sdk-appservice/src/registration.rs
Normal file
124
crates/matrix-sdk-appservice/src/registration.rs
Normal file
@@ -0,0 +1,124 @@
|
||||
// Copyright 2022 Famedly GmbH
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! AppService Registration.
|
||||
|
||||
use std::{convert::TryFrom, fs::File, ops::Deref, path::PathBuf};
|
||||
|
||||
use http::Uri;
|
||||
use regex::Regex;
|
||||
use ruma::api::appservice::Registration;
|
||||
|
||||
use crate::{Error, Result};
|
||||
|
||||
pub type Host = String;
|
||||
pub type Port = u16;
|
||||
|
||||
/// AppService Registration
|
||||
///
|
||||
/// Wrapper around [`Registration`]. See also <https://matrix.org/docs/spec/application_service/r0.1.2#registration>.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AppServiceRegistration {
|
||||
inner: Registration,
|
||||
}
|
||||
|
||||
impl AppServiceRegistration {
|
||||
/// Try to load registration from yaml string
|
||||
///
|
||||
/// See the fields of [`Registration`] for the required format
|
||||
pub fn try_from_yaml_str(value: impl AsRef<str>) -> Result<Self> {
|
||||
Ok(Self { inner: serde_yaml::from_str(value.as_ref())? })
|
||||
}
|
||||
|
||||
/// Try to load registration from yaml file
|
||||
///
|
||||
/// See the fields of [`Registration`] for the required format
|
||||
pub fn try_from_yaml_file(path: impl Into<PathBuf>) -> Result<Self> {
|
||||
let file = File::open(path.into())?;
|
||||
|
||||
Ok(Self { inner: serde_yaml::from_reader(file)? })
|
||||
}
|
||||
|
||||
/// Get the host and port from the registration URL
|
||||
///
|
||||
/// If no port is found it falls back to scheme defaults: 80 for http and
|
||||
/// 443 for https
|
||||
pub fn get_host_and_port(&self) -> Result<(Host, Port)> {
|
||||
let uri = Uri::try_from(&self.inner.url)?;
|
||||
|
||||
let host = uri.host().ok_or(Error::MissingRegistrationHost)?.to_owned();
|
||||
let port = match uri.port() {
|
||||
Some(port) => Ok(port.as_u16()),
|
||||
None => match uri.scheme_str() {
|
||||
Some("http") => Ok(80),
|
||||
Some("https") => Ok(443),
|
||||
_ => Err(Error::MissingRegistrationPort),
|
||||
},
|
||||
}?;
|
||||
|
||||
Ok((host, port))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Registration> for AppServiceRegistration {
|
||||
fn from(value: Registration) -> Self {
|
||||
Self { inner: value }
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for AppServiceRegistration {
|
||||
type Target = Registration;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache data for the registration namespaces.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NamespaceCache {
|
||||
/// List of user regexes in our namespace
|
||||
pub(crate) users: Vec<Regex>,
|
||||
/// List of alias regexes in our namespace
|
||||
#[allow(dead_code)]
|
||||
aliases: Vec<Regex>,
|
||||
/// List of room id regexes in our namespace
|
||||
#[allow(dead_code)]
|
||||
rooms: Vec<Regex>,
|
||||
}
|
||||
|
||||
impl NamespaceCache {
|
||||
/// Creates a new registration cache from a [`Registration`] value
|
||||
pub fn from_registration(registration: &Registration) -> Result<Self> {
|
||||
let users = registration
|
||||
.namespaces
|
||||
.users
|
||||
.iter()
|
||||
.map(|user| Regex::new(&user.regex))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let aliases = registration
|
||||
.namespaces
|
||||
.aliases
|
||||
.iter()
|
||||
.map(|user| Regex::new(&user.regex))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let rooms = registration
|
||||
.namespaces
|
||||
.rooms
|
||||
.iter()
|
||||
.map(|user| Regex::new(&user.regex))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
Ok(NamespaceCache { users, aliases, rooms })
|
||||
}
|
||||
}
|
||||
149
crates/matrix-sdk-appservice/src/virtual_user.rs
Normal file
149
crates/matrix-sdk-appservice/src/virtual_user.rs
Normal file
@@ -0,0 +1,149 @@
|
||||
// Copyright 2022 Famedly GmbH
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Virtual users.
|
||||
|
||||
use matrix_sdk::{config::RequestConfig, Client, ClientBuildError, ClientBuilder, Session};
|
||||
use ruma::{
|
||||
api::client::{session::login, uiaa::UserIdentifier},
|
||||
assign, DeviceId, OwnedDeviceId, UserId,
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{AppService, Result};
|
||||
|
||||
/// Builder for a virtual user
|
||||
#[derive(Debug)]
|
||||
pub struct VirtualUserBuilder<'a> {
|
||||
appservice: &'a AppService,
|
||||
localpart: &'a str,
|
||||
device_id: Option<OwnedDeviceId>,
|
||||
client_builder: ClientBuilder,
|
||||
log_in: bool,
|
||||
restored_session: Option<Session>,
|
||||
}
|
||||
|
||||
impl<'a> VirtualUserBuilder<'a> {
|
||||
/// Create a new virtual user builder
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `localpart` - The localpart of the virtual user
|
||||
pub fn new(appservice: &'a AppService, localpart: &'a str) -> Self {
|
||||
Self {
|
||||
appservice,
|
||||
localpart,
|
||||
device_id: None,
|
||||
client_builder: Client::builder(),
|
||||
log_in: false,
|
||||
restored_session: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the device ID of the virtual user
|
||||
pub fn device_id(mut self, device_id: Option<OwnedDeviceId>) -> Self {
|
||||
self.device_id = device_id;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the client builder to use for the virtual user
|
||||
pub fn client_builder(mut self, client_builder: ClientBuilder) -> Self {
|
||||
self.client_builder = client_builder;
|
||||
self
|
||||
}
|
||||
|
||||
/// Log in as the virtual user
|
||||
///
|
||||
/// In some cases it is necessary to log in as the virtual user, such as to
|
||||
/// upload device keys
|
||||
pub fn login(mut self) -> Self {
|
||||
self.log_in = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Restore a persisted session
|
||||
///
|
||||
/// This is primarily useful if you enable
|
||||
/// [`VirtualUserBuilder::login()`] and want to restore a session
|
||||
/// from a previous run.
|
||||
pub fn restored_session(mut self, session: Session) -> Self {
|
||||
self.restored_session = Some(session);
|
||||
self
|
||||
}
|
||||
|
||||
/// Build the virtual user
|
||||
///
|
||||
/// # Errors
|
||||
/// This function returns an error if an invalid localpart is provided.
|
||||
pub async fn build(self) -> Result<Client> {
|
||||
if let Some(client) = self.appservice.clients.get(self.localpart) {
|
||||
return Ok(client.clone());
|
||||
}
|
||||
|
||||
let user_id = UserId::parse_with_server_name(self.localpart, &self.appservice.server_name)?;
|
||||
if !(self.appservice.user_id_is_in_namespace(&user_id)
|
||||
|| self.localpart == self.appservice.registration.sender_localpart)
|
||||
{
|
||||
warn!("Virtual client id '{user_id}' is not in the namespace")
|
||||
}
|
||||
|
||||
let mut builder = self.client_builder;
|
||||
|
||||
if !self.log_in && self.localpart != self.appservice.registration.sender_localpart {
|
||||
builder = builder.assert_identity();
|
||||
}
|
||||
|
||||
let client = builder
|
||||
.homeserver_url(self.appservice.homeserver_url.clone())
|
||||
.appservice_mode()
|
||||
.build()
|
||||
.await
|
||||
.map_err(ClientBuildError::assert_valid_builder_args)?;
|
||||
|
||||
let session = if let Some(session) = self.restored_session {
|
||||
session
|
||||
} else if self.log_in && self.localpart != self.appservice.registration.sender_localpart {
|
||||
let login_info =
|
||||
login::v3::LoginInfo::ApplicationService(login::v3::ApplicationService::new(
|
||||
UserIdentifier::UserIdOrLocalpart(self.localpart),
|
||||
));
|
||||
|
||||
let request = assign!(login::v3::Request::new(login_info), {
|
||||
device_id: self.device_id.as_ref().map(|v| v.as_ref()),
|
||||
initial_device_display_name: None,
|
||||
});
|
||||
|
||||
let response =
|
||||
client.send(request, Some(RequestConfig::short_retry().force_auth())).await?;
|
||||
|
||||
Session {
|
||||
access_token: response.access_token,
|
||||
user_id: response.user_id,
|
||||
device_id: response.device_id,
|
||||
}
|
||||
} else {
|
||||
// Don’t log in
|
||||
Session {
|
||||
access_token: self.appservice.registration.as_token.clone(),
|
||||
user_id: user_id.clone(),
|
||||
device_id: self.device_id.unwrap_or_else(DeviceId::new),
|
||||
}
|
||||
};
|
||||
|
||||
client.restore_login(session).await?;
|
||||
|
||||
self.appservice.clients.insert(self.localpart.to_owned(), client.clone());
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
}
|
||||
@@ -125,6 +125,8 @@ async fn test_put_transaction_with_repeating_txn_id() -> Result<()> {
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
let on_state_member = Arc::new(Mutex::new(false));
|
||||
appservice
|
||||
.virtual_user(None)
|
||||
.await?
|
||||
.register_event_handler({
|
||||
let on_state_member = on_state_member.clone();
|
||||
move |_ev: OriginalSyncRoomMemberEvent| {
|
||||
@@ -132,7 +134,7 @@ async fn test_put_transaction_with_repeating_txn_id() -> Result<()> {
|
||||
future::ready(())
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
.await;
|
||||
|
||||
let status = warp::test::request()
|
||||
.method("PUT")
|
||||
@@ -279,6 +281,8 @@ async fn test_event_handler() -> Result<()> {
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
let on_state_member = Arc::new(Mutex::new(false));
|
||||
appservice
|
||||
.virtual_user(None)
|
||||
.await?
|
||||
.register_event_handler({
|
||||
let on_state_member = on_state_member.clone();
|
||||
move |_ev: OriginalSyncRoomMemberEvent| {
|
||||
@@ -286,7 +290,7 @@ async fn test_event_handler() -> Result<()> {
|
||||
future::ready(())
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
.await;
|
||||
|
||||
let uri = "/_matrix/app/v1/transactions/1?access_token=hs_token";
|
||||
|
||||
@@ -365,7 +369,8 @@ async fn test_appservice_on_sub_path() -> Result<()> {
|
||||
};
|
||||
|
||||
let members = appservice
|
||||
.get_cached_client(None)?
|
||||
.virtual_user(None)
|
||||
.await?
|
||||
.get_room(room_id)
|
||||
.expect("Expected room to be available")
|
||||
.members_no_sync()
|
||||
@@ -455,8 +460,8 @@ async fn test_receive_transaction() -> Result<()> {
|
||||
];
|
||||
let appservice = appservice(None, None).await?;
|
||||
|
||||
let alice = appservice.virtual_user_client("_appservice_alice").await?;
|
||||
let bob = appservice.virtual_user_client("_appservice_bob").await?;
|
||||
let alice = appservice.virtual_user(Some("_appservice_alice")).await?;
|
||||
let bob = appservice.virtual_user(Some("_appservice_bob")).await?;
|
||||
appservice
|
||||
.receive_transaction(push_events::v1::IncomingRequest::new("dontcare".into(), json))
|
||||
.await?;
|
||||
|
||||
Reference in New Issue
Block a user