From f8a75962193ec2a6fe60aa92aa24d427a1916a4c Mon Sep 17 00:00:00 2001 From: andrekir Date: Sun, 28 Jan 2024 07:51:36 -0300 Subject: [PATCH] feat: add JSON topic subscription to MQTT client --- .../mesh/repository/network/MQTTRepository.kt | 108 +++++++++--------- 1 file changed, 52 insertions(+), 56 deletions(-) diff --git a/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt b/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt index c447f58a4..860602f76 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt @@ -48,61 +48,6 @@ class MQTTRepository @Inject constructor( private var mqttClient: MqttAsyncClient? = null - suspend fun connect(callback: MqttCallbackExtended) { - val ownerId = radioConfigRepository.nodeDB.myId.value ?: generateClientId() - val channelSet = radioConfigRepository.channelSetFlow.first() - val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt - - val sslContext = SSLContext.getInstance("TLS") - // Create a custom SSLContext that trusts all certificates - sslContext.init(null, arrayOf(TrustAllX509TrustManager()), SecureRandom()) - - val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId - val connectOptions = MqttConnectOptions().apply { - userName = mqttConfig.username - password = mqttConfig.password.toCharArray() - isCleanSession = false // must be false to auto subscribe on reconnects - isAutomaticReconnect = true - if (mqttConfig.tlsEnabled) { - socketFactory = sslContext.socketFactory - } - setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true) - } - - val bufferOptions = DisconnectedBufferOptions().apply { - isBufferEnabled = true - bufferSize = 512 - isPersistBuffer = false - isDeleteOldestMessages = true - } - - val scheme = if (mqttConfig.tlsEnabled) "ssl" else "tcp" - val (host, port) = mqttConfig.address.ifEmpty { DEFAULT_SERVER_ADDRESS } - .split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1) } - - val serverURI: String = URI(scheme, null, host, port, "", "", "").toString() - - val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + - DEFAULT_TOPIC_LEVEL // FIXME if (mqttConfig.jsonEnabled) JSON_TOPIC_LEVEL else DEFAULT_TOPIC_LEVEL - - mqttClient = MqttAsyncClient( - serverURI, - ownerId, - MemoryPersistence(), - ) - mqttClient?.apply { - setCallback(callback) - setBufferOpts(bufferOptions) - connect(connectOptions).waitForCompletion() - - channelSet.subscribeList.forEach { globalId -> - val topicFilter = "$topic$globalId/#" - subscribe(topicFilter, DEFAULT_QOS).waitForCompletion() - info("MQTT Subscribed to topic: $topicFilter") - } - } - } - fun disconnect() { info("MQTT Disconnected") mqttClient?.apply { @@ -113,9 +58,42 @@ class MQTTRepository @Inject constructor( } val proxyMessageFlow: Flow = callbackFlow { + val ownerId = radioConfigRepository.nodeDB.myId.value ?: generateClientId() + val channelSet = radioConfigRepository.channelSetFlow.first() + val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt + + val sslContext = SSLContext.getInstance("TLS") + // Create a custom SSLContext that trusts all certificates + sslContext.init(null, arrayOf(TrustAllX509TrustManager()), SecureRandom()) + + val rootTopic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + val statTopic = "$rootTopic$STAT_TOPIC_LEVEL$ownerId" + + val connectOptions = MqttConnectOptions().apply { + userName = mqttConfig.username + password = mqttConfig.password.toCharArray() + isAutomaticReconnect = true + if (mqttConfig.tlsEnabled) { + socketFactory = sslContext.socketFactory + } + setWill(statTopic, "offline".encodeToByteArray(), DEFAULT_QOS, true) + } + + val bufferOptions = DisconnectedBufferOptions().apply { + isBufferEnabled = true + bufferSize = 512 + isPersistBuffer = false + isDeleteOldestMessages = true + } + val callback = object : MqttCallbackExtended { override fun connectComplete(reconnect: Boolean, serverURI: String) { info("MQTT connectComplete: $serverURI reconnect: $reconnect ") + channelSet.subscribeList.forEach { globalId -> + subscribe("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/#") + if (mqttConfig.jsonEnabled) subscribe("$rootTopic$JSON_TOPIC_LEVEL$globalId/#") + } + // publish(statTopic, "online".encodeToByteArray(), DEFAULT_QOS, true) } override fun connectionLost(cause: Throwable) { @@ -135,11 +113,29 @@ class MQTTRepository @Inject constructor( info("MQTT deliveryComplete messageId: ${token?.messageId}") } } - connect(callback) + + val scheme = if (mqttConfig.tlsEnabled) "ssl" else "tcp" + val (host, port) = mqttConfig.address.ifEmpty { DEFAULT_SERVER_ADDRESS } + .split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1) } + + mqttClient = MqttAsyncClient( + URI(scheme, null, host, port, "", "", "").toString(), + ownerId, + MemoryPersistence(), + ).apply { + setCallback(callback) + setBufferOpts(bufferOptions) + connect(connectOptions) + } awaitClose { disconnect() } } + private fun subscribe(topic: String) { + mqttClient?.subscribe(topic, DEFAULT_QOS) + info("MQTT Subscribed to topic: $topic") + } + fun publish(topic: String, data: ByteArray, retained: Boolean) { try { val token = mqttClient?.publish(topic, data, DEFAULT_QOS, retained)