diff --git a/app/src/main/java/com/geeksville/mesh/repository/datastore/ChannelSetRepository.kt b/app/src/main/java/com/geeksville/mesh/repository/datastore/ChannelSetRepository.kt index 40e7ce91c..d1978340e 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/datastore/ChannelSetRepository.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/datastore/ChannelSetRepository.kt @@ -8,7 +8,6 @@ import com.geeksville.mesh.ChannelProtos.ChannelSettings import com.geeksville.mesh.ConfigProtos import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.firstOrNull import java.io.IOException import javax.inject.Inject @@ -68,7 +67,4 @@ class ChannelSetRepository @Inject constructor( preference.toBuilder().setLoraConfig(config).build() } } - - suspend fun fetchInitialChannelSet() = channelSetStore.data.firstOrNull() - } diff --git a/app/src/main/java/com/geeksville/mesh/repository/datastore/LocalConfigRepository.kt b/app/src/main/java/com/geeksville/mesh/repository/datastore/LocalConfigRepository.kt index ab4d35646..3d7faf65f 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/datastore/LocalConfigRepository.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/datastore/LocalConfigRepository.kt @@ -6,7 +6,6 @@ import com.geeksville.mesh.ConfigProtos.Config import com.geeksville.mesh.LocalOnlyProtos.LocalConfig import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.first import java.io.IOException import javax.inject.Inject @@ -87,7 +86,4 @@ class LocalConfigRepository @Inject constructor( preference.toBuilder().setBluetooth(config).build() } } - - suspend fun fetchInitialLocalConfig() = localConfigStore.data.first() - } diff --git a/app/src/main/java/com/geeksville/mesh/repository/datastore/ModuleConfigRepository.kt b/app/src/main/java/com/geeksville/mesh/repository/datastore/ModuleConfigRepository.kt index bbc00303f..d8e510693 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/datastore/ModuleConfigRepository.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/datastore/ModuleConfigRepository.kt @@ -6,7 +6,6 @@ import com.geeksville.mesh.ModuleConfigProtos.ModuleConfig import com.geeksville.mesh.LocalOnlyProtos.LocalModuleConfig import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.first import java.io.IOException import javax.inject.Inject @@ -122,7 +121,4 @@ class ModuleConfigRepository @Inject constructor( preference.toBuilder().setDetectionSensor(config).build() } } - - suspend fun fetchInitialModuleConfig() = moduleConfigStore.data.first() - } diff --git a/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt b/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt index c4c4c1188..c447f58a4 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt @@ -4,13 +4,13 @@ import com.geeksville.mesh.MeshProtos.MqttClientProxyMessage import com.geeksville.mesh.android.Logging import com.geeksville.mesh.model.subscribeList import com.geeksville.mesh.mqttClientProxyMessage -import com.geeksville.mesh.repository.datastore.ChannelSetRepository -import com.geeksville.mesh.repository.datastore.ModuleConfigRepository +import com.geeksville.mesh.repository.datastore.RadioConfigRepository import com.geeksville.mesh.util.ignoreException import com.google.protobuf.ByteString import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.first import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken import org.eclipse.paho.client.mqttv3.MqttAsyncClient @@ -28,8 +28,7 @@ import javax.net.ssl.TrustManager @Singleton class MQTTRepository @Inject constructor( - private val channelSetRepository: ChannelSetRepository, - private val moduleConfigRepository: ModuleConfigRepository, + private val radioConfigRepository: RadioConfigRepository, ) : Logging { companion object { @@ -50,14 +49,15 @@ class MQTTRepository @Inject constructor( private var mqttClient: MqttAsyncClient? = null suspend fun connect(callback: MqttCallbackExtended) { - val channelSet = channelSetRepository.fetchInitialChannelSet() ?: return - val mqttConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt + val ownerId = radioConfigRepository.nodeDB.myId.value ?: generateClientId() + val channelSet = radioConfigRepository.channelSetFlow.first() + val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt val sslContext = SSLContext.getInstance("TLS") // Create a custom SSLContext that trusts all certificates sslContext.init(null, arrayOf(TrustAllX509TrustManager()), SecureRandom()) - // val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId + val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId val connectOptions = MqttConnectOptions().apply { userName = mqttConfig.username password = mqttConfig.password.toCharArray() @@ -66,7 +66,7 @@ class MQTTRepository @Inject constructor( if (mqttConfig.tlsEnabled) { socketFactory = sslContext.socketFactory } - // setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true) + setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true) } val bufferOptions = DisconnectedBufferOptions().apply { @@ -87,7 +87,7 @@ class MQTTRepository @Inject constructor( mqttClient = MqttAsyncClient( serverURI, - generateClientId(), + ownerId, MemoryPersistence(), ) mqttClient?.apply { @@ -135,17 +135,9 @@ class MQTTRepository @Inject constructor( info("MQTT deliveryComplete messageId: ${token?.messageId}") } } + connect(callback) - try { - connect(callback) - } catch (ex: Exception) { - errormsg("MQTT Connect error: ${ex.message}") - close(ex) - } - - awaitClose { - disconnect() - } + awaitClose { disconnect() } } fun publish(topic: String, data: ByteArray, retained: Boolean) { @@ -156,8 +148,4 @@ class MQTTRepository @Inject constructor( errormsg("MQTT Publish error: ${ex.message}") } } - - fun publish(topic: String, message: String, retained: Boolean) { - publish(topic, message.encodeToByteArray(), retained) - } } diff --git a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt index 5224ec15d..032291068 100644 --- a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt +++ b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt @@ -1376,7 +1376,7 @@ class MeshService : Service(), Logging { with(message) { when (payloadVariantCase) { MeshProtos.MqttClientProxyMessage.PayloadVariantCase.TEXT -> { - mqttRepository.publish(topic, text, retained) + mqttRepository.publish(topic, text.encodeToByteArray(), retained) } MeshProtos.MqttClientProxyMessage.PayloadVariantCase.DATA -> {