refactor: add user.id as unique mqtt client identifier

This commit is contained in:
andrekir
2023-12-27 19:48:40 -03:00
parent 55a0714ee7
commit f73e7fd301
5 changed files with 12 additions and 36 deletions

View File

@@ -8,7 +8,6 @@ import com.geeksville.mesh.ChannelProtos.ChannelSettings
import com.geeksville.mesh.ConfigProtos
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.firstOrNull
import java.io.IOException
import javax.inject.Inject
@@ -68,7 +67,4 @@ class ChannelSetRepository @Inject constructor(
preference.toBuilder().setLoraConfig(config).build()
}
}
suspend fun fetchInitialChannelSet() = channelSetStore.data.firstOrNull()
}

View File

@@ -6,7 +6,6 @@ import com.geeksville.mesh.ConfigProtos.Config
import com.geeksville.mesh.LocalOnlyProtos.LocalConfig
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import java.io.IOException
import javax.inject.Inject
@@ -87,7 +86,4 @@ class LocalConfigRepository @Inject constructor(
preference.toBuilder().setBluetooth(config).build()
}
}
suspend fun fetchInitialLocalConfig() = localConfigStore.data.first()
}

View File

@@ -6,7 +6,6 @@ import com.geeksville.mesh.ModuleConfigProtos.ModuleConfig
import com.geeksville.mesh.LocalOnlyProtos.LocalModuleConfig
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import java.io.IOException
import javax.inject.Inject
@@ -122,7 +121,4 @@ class ModuleConfigRepository @Inject constructor(
preference.toBuilder().setDetectionSensor(config).build()
}
}
suspend fun fetchInitialModuleConfig() = moduleConfigStore.data.first()
}

View File

@@ -4,13 +4,13 @@ import com.geeksville.mesh.MeshProtos.MqttClientProxyMessage
import com.geeksville.mesh.android.Logging
import com.geeksville.mesh.model.subscribeList
import com.geeksville.mesh.mqttClientProxyMessage
import com.geeksville.mesh.repository.datastore.ChannelSetRepository
import com.geeksville.mesh.repository.datastore.ModuleConfigRepository
import com.geeksville.mesh.repository.datastore.RadioConfigRepository
import com.geeksville.mesh.util.ignoreException
import com.google.protobuf.ByteString
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.first
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttAsyncClient
@@ -28,8 +28,7 @@ import javax.net.ssl.TrustManager
@Singleton
class MQTTRepository @Inject constructor(
private val channelSetRepository: ChannelSetRepository,
private val moduleConfigRepository: ModuleConfigRepository,
private val radioConfigRepository: RadioConfigRepository,
) : Logging {
companion object {
@@ -50,14 +49,15 @@ class MQTTRepository @Inject constructor(
private var mqttClient: MqttAsyncClient? = null
suspend fun connect(callback: MqttCallbackExtended) {
val channelSet = channelSetRepository.fetchInitialChannelSet() ?: return
val mqttConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt
val ownerId = radioConfigRepository.nodeDB.myId.value ?: generateClientId()
val channelSet = radioConfigRepository.channelSetFlow.first()
val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
val sslContext = SSLContext.getInstance("TLS")
// Create a custom SSLContext that trusts all certificates
sslContext.init(null, arrayOf<TrustManager>(TrustAllX509TrustManager()), SecureRandom())
// val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId
val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId
val connectOptions = MqttConnectOptions().apply {
userName = mqttConfig.username
password = mqttConfig.password.toCharArray()
@@ -66,7 +66,7 @@ class MQTTRepository @Inject constructor(
if (mqttConfig.tlsEnabled) {
socketFactory = sslContext.socketFactory
}
// setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true)
setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true)
}
val bufferOptions = DisconnectedBufferOptions().apply {
@@ -87,7 +87,7 @@ class MQTTRepository @Inject constructor(
mqttClient = MqttAsyncClient(
serverURI,
generateClientId(),
ownerId,
MemoryPersistence(),
)
mqttClient?.apply {
@@ -135,17 +135,9 @@ class MQTTRepository @Inject constructor(
info("MQTT deliveryComplete messageId: ${token?.messageId}")
}
}
connect(callback)
try {
connect(callback)
} catch (ex: Exception) {
errormsg("MQTT Connect error: ${ex.message}")
close(ex)
}
awaitClose {
disconnect()
}
awaitClose { disconnect() }
}
fun publish(topic: String, data: ByteArray, retained: Boolean) {
@@ -156,8 +148,4 @@ class MQTTRepository @Inject constructor(
errormsg("MQTT Publish error: ${ex.message}")
}
}
fun publish(topic: String, message: String, retained: Boolean) {
publish(topic, message.encodeToByteArray(), retained)
}
}

View File

@@ -1376,7 +1376,7 @@ class MeshService : Service(), Logging {
with(message) {
when (payloadVariantCase) {
MeshProtos.MqttClientProxyMessage.PayloadVariantCase.TEXT -> {
mqttRepository.publish(topic, text, retained)
mqttRepository.publish(topic, text.encodeToByteArray(), retained)
}
MeshProtos.MqttClientProxyMessage.PayloadVariantCase.DATA -> {