Device register route and integrate key manager on bootstrap

This commit is contained in:
Ericson Soares
2024-08-19 20:38:07 -03:00
parent 4421a7f823
commit 5eb4fa6d3b
7 changed files with 320 additions and 101 deletions

View File

@@ -34,7 +34,7 @@ pub struct CloudServices {
http_client: ClientWithMiddleware,
domain_name: String,
pub token_refresher: TokenRefresher,
pub key_manager: Option<Arc<KeyManager>>,
key_manager: Arc<RwLock<Option<Arc<KeyManager>>>>,
}
impl CloudServices {
@@ -90,7 +90,7 @@ impl CloudServices {
get_cloud_api_address,
http_client,
domain_name,
key_manager: None,
key_manager: Arc::default(),
})
}
@@ -180,6 +180,23 @@ impl CloudServices {
Ok(client)
}
pub async fn set_key_manager(&self, key_manager: KeyManager) {
self.key_manager
.write()
.await
.replace(Arc::new(key_manager));
}
pub async fn key_manager(&self) -> Result<Arc<KeyManager>, Error> {
self.key_manager
.read()
.await
.as_ref()
.map_or(Err(Error::KeyManagerNotInitialized), |key_manager| {
Ok(Arc::clone(key_manager))
})
}
}
#[cfg(test)]

View File

@@ -46,6 +46,8 @@ pub enum Error {
source: sd_crypto::Error,
context: &'static str,
},
#[error("Key manager not initialized")]
KeyManagerNotInitialized,
}
#[derive(thiserror::Error, Debug)]

View File

@@ -26,9 +26,9 @@ pub struct KeyStore {
}
impl KeyStore {
pub fn new(rng: &mut CryptoRng) -> Self {
pub fn new(iroh_secret_key: IrohSecretKey) -> Self {
Self {
iroh_secret_key: IrohSecretKey::generate_with_rng(rng),
iroh_secret_key,
keys_by_hash: HashMap::new(),
}
}

View File

@@ -10,7 +10,7 @@ use std::{
};
use iroh_base::key::{NodeId, SecretKey as IrohSecretKey};
use tokio::{fs, io, sync::RwLock};
use tokio::{fs, sync::RwLock};
mod key_store;
@@ -27,33 +27,18 @@ pub struct KeyManager {
impl KeyManager {
pub async fn new(
master_key: SecretKey,
iroh_secret_key: IrohSecretKey,
data_directory: impl AsRef<Path> + Send,
rng: &mut CryptoRng,
) -> Result<Self, Error> {
async fn inner(
master_key: SecretKey,
iroh_secret_key: IrohSecretKey,
keys_file_path: PathBuf,
rng: &mut CryptoRng,
) -> Result<KeyManager, Error> {
let store = match fs::metadata(&keys_file_path).await {
Ok(metadata) => KeyStore::decrypt(&master_key, metadata, &keys_file_path).await?,
Err(e) if e.kind() == io::ErrorKind::NotFound => {
// File not found, so we create a new one
let store = KeyStore::new(rng);
store.encrypt(&master_key, rng, &keys_file_path).await?;
store
}
Err(e) => {
return Err(FileIOError::from((
keys_file_path,
e,
"Failed to read space keys file",
))
.into());
}
};
let store = KeyStore::new(iroh_secret_key);
store.encrypt(&master_key, rng, &keys_file_path).await?;
Ok(KeyManager {
master_key,
@@ -62,7 +47,44 @@ impl KeyManager {
})
}
inner(master_key, data_directory.as_ref().join(KEY_FILE_NAME), rng).await
inner(
master_key,
iroh_secret_key,
data_directory.as_ref().join(KEY_FILE_NAME),
rng,
)
.await
}
pub async fn load(
master_key: SecretKey,
data_directory: impl AsRef<Path> + Send,
) -> Result<Self, Error> {
async fn inner(
master_key: SecretKey,
keys_file_path: PathBuf,
) -> Result<KeyManager, Error> {
Ok(KeyManager {
store: RwLock::new(
KeyStore::decrypt(
&master_key,
fs::metadata(&keys_file_path).await.map_err(|e| {
FileIOError::from((
&keys_file_path,
e,
"Failed to read space keys file",
))
})?,
&keys_file_path,
)
.await?,
),
master_key,
keys_file_path,
})
}
inner(master_key, data_directory.as_ref().join(KEY_FILE_NAME)).await
}
pub async fn iroh_secret_key(&self) -> IrohSecretKey {

View File

@@ -37,6 +37,7 @@ mod token_refresher;
pub use cloud_client::CloudServices;
pub use error::{Error, GetTokenError};
pub use key_manager::KeyManager;
// Re-exports
pub use iroh_base::key::{NodeId, SecretKey as IrohSecretKey};

View File

@@ -1,16 +1,21 @@
use crate::{api::{Ctx, R}, node::HardwareModel};
use crate::{
api::{Ctx, R},
node::HardwareModel,
};
use futures::{SinkExt, StreamExt};
use sd_cloud_schema::{
auth::AccessToken,
devices::{self, DeviceOS, PubId},
opaque_ke::{
rand::rngs::OsRng, ClientLogin, ClientLoginFinishParameters, ClientLoginFinishResult,
ClientLoginStartResult,
ClientLogin, ClientLoginFinishParameters, ClientLoginFinishResult, ClientLoginStartResult,
ClientRegistration, ClientRegistrationFinishParameters, ClientRegistrationFinishResult,
ClientRegistrationStartResult,
},
Client, Service, SpacedriveCipherSuite,
};
use sd_core_cloud_services::QuinnConnection;
use sd_core_cloud_services::{NodeId, QuinnConnection};
use sd_crypto::{cloud::secret_key::SecretKey, CryptoRng};
use blake3::Hash;
use chrono::DateTime;
@@ -198,20 +203,19 @@ pub async fn hello(
access_token: AccessToken,
device_pub_id: PubId,
hashed_pub_id: Hash,
) -> Result<(), rspc::Error> {
rng: &mut CryptoRng,
) -> Result<SecretKey, rspc::Error> {
use devices::hello::{Request, RequestUpdate, Response, State};
let ClientLoginStartResult { message, state } = ClientLogin::<SpacedriveCipherSuite>::start(
&mut OsRng,
hashed_pub_id.as_bytes().as_slice(),
)
.map_err(|e| {
error!(?e, "OPAQUE error initializing device hello request;");
rspc::Error::new(
rspc::ErrorCode::InternalServerError,
"Failed to initialize device login".into(),
)
})?;
let ClientLoginStartResult { message, state } =
ClientLogin::<SpacedriveCipherSuite>::start(rng, hashed_pub_id.as_bytes().as_slice())
.map_err(|e| {
error!(?e, "OPAQUE error initializing device hello request;");
rspc::Error::new(
rspc::ErrorCode::InternalServerError,
"Failed to initialize device login".into(),
)
})?;
let (mut hello_continuation, mut res_stream) = handle_comm_error(
client
@@ -234,9 +238,9 @@ pub async fn hello(
));
};
let login_response =
let credential_response =
match handle_comm_error(res, "Communication error on device hello response;")? {
Ok(Response(State::LoginResponse(login_response))) => login_response,
Ok(Response(State::LoginResponse(credential_response))) => credential_response,
Ok(Response(State::End)) => {
unreachable!("Device hello response MUST not be End here, this is a serious bug and should crash;");
}
@@ -253,7 +257,7 @@ pub async fn hello(
} = state
.finish(
hashed_pub_id.as_bytes().as_slice(),
*login_response,
*credential_response,
ClientLoginFinishParameters::default(),
)
.map_err(|e| {
@@ -290,16 +294,146 @@ pub async fn hello(
Ok(Response(State::LoginResponse(_))) => {
unreachable!("Device hello final response MUST be End here, this is a serious bug and should crash;");
}
Ok(Response(State::End)) => {}
Ok(Response(State::End)) => {
// Protocol completed successfully
Ok(SecretKey::new(export_key.as_slice().try_into().expect(
"Key mismatch between OPAQUE and crypto crate; this is a serious bug and should crash;",
)))
}
Err(e) => {
error!(?e, "Device hello final response error;");
return Err(e.into());
Err(e.into())
}
}
}
pub struct DeviceRegisterData {
pub pub_id: PubId,
pub name: String,
pub os: DeviceOS,
pub storage_size: u64,
pub connection_id: NodeId,
}
pub async fn register(
client: &Client<QuinnConnection<Service>, Service>,
access_token: AccessToken,
DeviceRegisterData {
pub_id,
name,
os,
storage_size,
connection_id,
}: DeviceRegisterData,
hashed_pub_id: Hash,
rng: &mut CryptoRng,
) -> Result<SecretKey, rspc::Error> {
use devices::register::{Request, RequestUpdate, Response, State};
let ClientRegistrationStartResult { message, state } =
ClientRegistration::<SpacedriveCipherSuite>::start(
rng,
hashed_pub_id.as_bytes().as_slice(),
)
.map_err(|e| {
error!(?e, "OPAQUE error initializing device register request;");
rspc::Error::new(
rspc::ErrorCode::InternalServerError,
"Failed to initialize device register".into(),
)
})?;
let (mut register_continuation, mut res_stream) = handle_comm_error(
client
.devices()
.register(Request {
access_token,
pub_id,
name,
os,
storage_size,
connection_id,
opaque_register_message: Box::new(message),
})
.await,
"Failed to send device register request;",
)?;
let Some(res) = res_stream.next().await else {
let message = "Server did not send a device register response;";
error!("{message}");
return Err(rspc::Error::new(
rspc::ErrorCode::InternalServerError,
message.to_string(),
));
};
Ok(())
}
let registration_response =
match handle_comm_error(res, "Communication error on device register response;")? {
Ok(Response(State::RegistrationResponse(res))) => res,
Ok(Response(State::End)) => {
unreachable!("Device hello response MUST not be End here, this is a serious bug and should crash;");
}
Err(e) => {
error!(?e, "Device hello response error;");
return Err(e.into());
}
};
pub async fn device_registration() -> Result<(), rspc::Error> {
Ok(())
let ClientRegistrationFinishResult {
message,
export_key,
..
} = state
.finish(
rng,
hashed_pub_id.as_bytes().as_slice(),
*registration_response,
ClientRegistrationFinishParameters::default(),
)
.map_err(|e| {
error!(?e, "Device register finish error;");
rspc::Error::new(
rspc::ErrorCode::InternalServerError,
"Failed to finish device register".into(),
)
})?;
register_continuation
.send(RequestUpdate {
opaque_registration_finish: Box::new(message),
})
.await
.map_err(|e| {
error!(?e, "Failed to send device register request continuation;");
rspc::Error::new(
rspc::ErrorCode::InternalServerError,
"Failed to finish device register procedure;".into(),
)
})?;
let Some(res) = res_stream.next().await else {
let message = "Server did not send a device register END response;";
error!("{message}");
return Err(rspc::Error::new(
rspc::ErrorCode::InternalServerError,
message.to_string(),
));
};
match handle_comm_error(res, "Communication error on device register response;")? {
Ok(Response(State::RegistrationResponse(_))) => {
unreachable!("Device register final response MUST be End here, this is a serious bug and should crash;");
}
Ok(Response(State::End)) => {
// Protocol completed successfully
Ok(SecretKey::new(export_key.as_slice().try_into().expect(
"Key mismatch between OPAQUE and crypto crate; this is a serious bug and should crash;",
)))
}
Err(e) => {
error!(?e, "Device register final response error;");
Err(e.into())
}
}
}

View File

@@ -1,11 +1,12 @@
use crate::Node;
use crate::{node::config::NodeConfig, volume::get_volumes, Node};
use sd_cloud_schema::{
auth,
error::{ClientSideError, Error},
users, Client, Service,
};
use sd_core_cloud_services::QuinnConnection;
use sd_core_cloud_services::{IrohSecretKey, KeyManager, QuinnConnection};
use sd_crypto::{CryptoRng, SeedableRng};
use rspc::alpha::AlphaRouter;
use tracing::error;
@@ -35,64 +36,106 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.merge("libraries.", libraries::mount())
.merge("locations.", locations::mount())
.merge("devices.", devices::mount())
// .procedure("bootstrap", {
// R.mutation(|node, access_token: auth::AccessToken| async move {
// use sd_cloud_schema::devices;
.procedure("bootstrap", {
R.mutation(
|node, (access_token, refresh_token): (auth::AccessToken, auth::RefreshToken)| async move {
use sd_cloud_schema::devices;
// let client = try_get_cloud_services_client(&node).await?;
node.cloud_services
.token_refresher
.init(access_token.clone(), refresh_token)
.await?;
// // create user route is idempotent, so we can safely keep creating the same user over and over
// handle_comm_error(
// client
// .users()
// .create(users::create::Request {
// access_token: access_token.clone(),
// })
// .await,
// "Failed to create user;",
// )??;
let client = try_get_cloud_services_client(&node).await?;
let data_directory = node.config.data_directory();
// let device_pub_id = devices::PubId(node.config.get().await.id);
// let mut hasher = blake3::Hasher::new();
// hasher.update(device_pub_id.0.as_bytes().as_slice());
// let hashed_pub_id = hasher.finalize();
let mut rng =
CryptoRng::from_seed(node.master_rng.lock().await.generate_fixed());
// match handle_comm_error(
// client
// .devices()
// .get(devices::get::Request {
// access_token: access_token.clone(),
// pub_id: device_pub_id,
// })
// .await,
// "Failed to get device on cloud bootstrap;",
// )? {
// Ok(_) => {
// // Device registered, we execute a device hello flow
// self::devices::hello(&client, access_token, device_pub_id, hashed_pub_id)
// .await
// }
// Err(Error::Client(ClientSideError::NotFound(_))) => {
// // Device not registered, we execute a device register flow
// todo!()
// }
// Err(e) => return Err(e.into()),
// }
// create user route is idempotent, so we can safely keep creating the same user over and over
handle_comm_error(
client
.users()
.create(users::create::Request {
access_token: access_token.clone(),
})
.await,
"Failed to create user;",
)??;
// // TODO: figure out a way to know if we need to register the device or send a device hello request
let (device_pub_id, name, os) = {
let NodeConfig { id, name, os, .. } = node.config.get().await;
(devices::PubId(id), name, os)
};
let mut hasher = blake3::Hasher::new();
hasher.update(device_pub_id.0.as_bytes().as_slice());
let hashed_pub_id = hasher.finalize();
// // TODO: in case of a device register request, we use the OPAQUE key to encrypt iroh's secret key (NodeId)
// // and save on data directory
let key_manager = match handle_comm_error(
client
.devices()
.get(devices::get::Request {
access_token: access_token.clone(),
pub_id: device_pub_id,
})
.await,
"Failed to get device on cloud bootstrap;",
)? {
Ok(_) => {
// Device registered, we execute a device hello flow
let master_key = self::devices::hello(
&client,
access_token,
device_pub_id,
hashed_pub_id,
&mut rng,
)
.await?;
// // TODO: in case of a device hello request, we use the OPAQUE key to decrypt iroh's secret key (NodeId)
// // and keep it in memory
KeyManager::load(master_key, data_directory).await?
}
Err(Error::Client(ClientSideError::NotFound(_))) => {
// Device not registered, we execute a device register flow
let iroh_secret_key = IrohSecretKey::generate_with_rng(&mut rng);
// // TODO: With this device iroh's secret key (NodeId) now known and we can start the iroh
// // node for cloud p2p
let master_key = self::devices::register(
&client,
access_token,
self::devices::DeviceRegisterData {
pub_id: device_pub_id,
name,
os,
// TODO(@fogodev): We should use storage statistics from sqlite db
storage_size: get_volumes()
.await
.into_iter()
.map(|volume| volume.total_capacity)
.sum(),
connection_id: iroh_secret_key.public(),
},
hashed_pub_id,
&mut rng,
)
.await?;
// Ok(())
// })
// })
KeyManager::new(master_key, iroh_secret_key, data_directory, &mut rng)
.await?
}
Err(e) => return Err(e.into()),
};
let iroh_secret_key = key_manager.iroh_secret_key().await;
node.cloud_services.set_key_manager(key_manager).await;
// TODO: With this device iroh's secret key (NodeId) now known and we can start the iroh
// node for cloud p2p
todo!("Start iroh node for cloud p2p");
Ok(())
},
)
})
}
fn handle_comm_error<T, E: std::error::Error + std::fmt::Debug + Send + Sync + 'static>(