diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 0b9bc8e35..151d44624 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -260,7 +260,6 @@ dependencies { implementation(libs.androidx.core.splashscreen) implementation(libs.kotlinx.serialization.json) implementation(libs.okhttp3.logging.interceptor) - implementation(libs.org.eclipse.paho.client.mqttv3) implementation(libs.usb.serial.android) implementation(libs.androidx.work.runtime.ktx) implementation(libs.koin.android) diff --git a/app/proguard-rules.pro b/app/proguard-rules.pro index cc6a76518..d885aee0a 100644 --- a/app/proguard-rules.pro +++ b/app/proguard-rules.pro @@ -24,9 +24,6 @@ -keep class com.google.protobuf.** { *; } -keep class org.meshtastic.proto.** { *; } -# eclipse.paho.client --keep class org.eclipse.paho.client.mqttv3.logging.JSR47Logger { *; } - # OkHttp -dontwarn okhttp3.internal.platform.** -dontwarn org.conscrypt.** diff --git a/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt b/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt index 58416a139..7178f7426 100644 --- a/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt +++ b/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt @@ -52,11 +52,6 @@ class NetworkModule { fun provideNsdManager(application: Application): NsdManager = application.getSystemService(Context.NSD_SERVICE) as NsdManager - @Single - fun bindMqttRepository( - impl: org.meshtastic.core.network.repository.MQTTRepositoryImpl, - ): org.meshtastic.core.network.repository.MQTTRepository = impl - @Single fun provideImageLoader( okHttpClient: OkHttpClient, diff --git a/conductor/product.md b/conductor/product.md index ccbd0a648..2c8a9f086 100644 --- a/conductor/product.md +++ b/conductor/product.md @@ -12,7 +12,7 @@ Meshtastic-Android is a Kotlin Multiplatform (KMP) application designed to facil - Emergency response and disaster relief teams ## Core Features -- Direct communication with Meshtastic hardware (via BLE, USB, TCP) +- Direct communication with Meshtastic hardware (via BLE, USB, TCP, MQTT) - Decentralized text messaging across the mesh network - Unified cross-platform notifications for messages and node events - Adaptive node and contact management diff --git a/conductor/tech-stack.md b/conductor/tech-stack.md index eb3244a32..ca55ace24 100644 --- a/conductor/tech-stack.md +++ b/conductor/tech-stack.md @@ -25,6 +25,7 @@ - **Ktor:** Multiplatform HTTP client for web services and TCP streaming. - **Kable:** Multiplatform BLE library used as the primary BLE transport for all targets (Android, Desktop, and future iOS). - **jSerialComm:** Cross-platform Java library used for direct Serial/USB communication with Meshtastic devices on the Desktop (JVM) target. +- **KMQTT:** Kotlin Multiplatform MQTT client and broker used for MQTT transport, replacing the Android-only Paho library. - **Coroutines & Flows:** For asynchronous programming and state management. ## Testing (KMP) diff --git a/conductor/tracks.md b/conductor/tracks.md index 22d3d6494..702f67e68 100644 --- a/conductor/tracks.md +++ b/conductor/tracks.md @@ -1,3 +1,8 @@ # Project Tracks This file tracks all major tracks for the project. Each track has its own detailed plan in its respective folder. + +--- + +- [x] **Track: MQTT transport** +*Link: [./tracks/mqtt_transport_20260318/](./tracks/mqtt_transport_20260318/)* \ No newline at end of file diff --git a/conductor/tracks/mqtt_transport_20260318/index.md b/conductor/tracks/mqtt_transport_20260318/index.md new file mode 100644 index 000000000..8f255c832 --- /dev/null +++ b/conductor/tracks/mqtt_transport_20260318/index.md @@ -0,0 +1,5 @@ +# Track mqtt_transport_20260318 Context + +- [Specification](./spec.md) +- [Implementation Plan](./plan.md) +- [Metadata](./metadata.json) \ No newline at end of file diff --git a/conductor/tracks/mqtt_transport_20260318/metadata.json b/conductor/tracks/mqtt_transport_20260318/metadata.json new file mode 100644 index 000000000..bd7d32747 --- /dev/null +++ b/conductor/tracks/mqtt_transport_20260318/metadata.json @@ -0,0 +1,8 @@ +{ + "track_id": "mqtt_transport_20260318", + "type": "feature", + "status": "new", + "created_at": "2026-03-18T00:00:00Z", + "updated_at": "2026-03-18T00:00:00Z", + "description": "MQTT transport" +} \ No newline at end of file diff --git a/conductor/tracks/mqtt_transport_20260318/plan.md b/conductor/tracks/mqtt_transport_20260318/plan.md new file mode 100644 index 000000000..5788491c1 --- /dev/null +++ b/conductor/tracks/mqtt_transport_20260318/plan.md @@ -0,0 +1,32 @@ +# Implementation Plan: MQTT Transport + +## Phase 1: Core Networking & Library Integration +- [x] Task: Evaluate and add KMP MQTT library dependency (e.g. Kmqtt) to `core:network` or `libs.versions.toml`. [2a4aa35] + - [x] Add dependency to `libs.versions.toml`. + - [x] Apply dependency in `core:network/build.gradle.kts`. +- [x] Task: Implement `MqttTransport` class in `commonMain` of `core:network`. [99d35b3] + - [x] Create failing tests in `commonTest` for MqttTransport initialization and configuration parsing. + - [x] Implement MqttTransport to parse URL (mqtt://, mqtts://), credentials, and configure the underlying MQTT client. + - [x] Write failing tests for connection state flows. + - [x] Implement connection lifecycle handling (connect, disconnect, reconnect). +- [x] Task: Conductor - User Manual Verification 'Phase 1: Core Networking & Library Integration' (Protocol in workflow.md) [93d9a50] + +## Phase 2: Publishing & Subscribing +- [x] Task: Implement message subscription and payload parsing. [4900f69] + - [x] Create failing tests for receiving and mapping standard Meshtastic JSON payloads from subscribed topics. + - [x] Implement topic subscription management in `MqttTransport`. + - [x] Implement payload parsing and integration with `core:model` definitions. +- [x] Task: Implement publishing mechanism. [0991210] + - [x] Create failing tests for formatting and publishing node information/messages to custom topics. + - [x] Implement publish functionality in `MqttTransport`. +- [x] Task: Conductor - User Manual Verification 'Phase 2: Publishing & Subscribing' (Protocol in workflow.md) [7418e53] + +## Phase 3: Service & UI Integration +- [x] Task: Integrate `MqttTransport` into `core:service` and `core:data`. [d414556, e172f53] + - [x] Create failing tests for orchestrating MQTT connection based on user preferences. + - [x] Implement service-level bindings to maintain background connection. +- [x] Task: Implement MQTT UI Configuration Settings. (Verified existing implementation) + - [x] Verified existing `MQTTConfigItemList.kt` correctly manages UI inputs. + - [x] Verified MQTT broker URL, username, password, and custom topic inputs exist in UI. + - [x] Verified UI inputs correctly wire to `ModuleConfig.MQTTConfig` used by `MQTTRepositoryImpl`. +- [x] Task: Conductor - User Manual Verification 'Phase 3: Service & UI Integration' (Protocol in workflow.md) [deaa324] \ No newline at end of file diff --git a/conductor/tracks/mqtt_transport_20260318/spec.md b/conductor/tracks/mqtt_transport_20260318/spec.md new file mode 100644 index 000000000..e1e213646 --- /dev/null +++ b/conductor/tracks/mqtt_transport_20260318/spec.md @@ -0,0 +1,33 @@ +# Specification: MQTT Transport + +## Overview +Implement an MQTT transport layer for the Meshtastic-Android Kotlin Multiplatform (KMP) application to enable communication with Meshtastic devices over MQTT. This will support Android, Desktop, iOS, and potentially Web platforms in the future. + +## Functional Requirements +- **Platforms:** Ensure the MQTT transport operates correctly across Android, Desktop, and iOS platforms, using KMP best practices (with considerations for Web compatibility if technically feasible). +- **Core Library:** Utilize a dedicated Kotlin Multiplatform MQTT client library (e.g., Kmqtt) within the `core:network` module. +- **Connection Features:** + - Support for both standard (`mqtt://`) and secure TLS/SSL (`mqtts://`) connections. + - Support for username and password authentication. +- **Messaging Features:** + - Subscribe to and publish on user-defined custom topics. + - Parse and serialize standard Meshtastic JSON payloads. +- **UI Integration:** + - Follow the existing Android UX patterns for network/device connections. + - Integrate MQTT configuration seamlessly into the connection or advanced settings menus. + +## Non-Functional Requirements +- **Architecture:** Business logic for MQTT communication must reside in the `core:network` (or a new `core:mqtt`) `commonMain` source set. +- **Testability:** Implement shared tests in `commonTest` to verify connection states, topic parsing, and payload serialization without relying on JVM-specific mocks. +- **Performance:** Ensure background execution and resource management align with the `core:service` architecture. + +## Acceptance Criteria +- [ ] Users can enter an MQTT broker URL (including TLS), username, and password in the UI. +- [ ] The app successfully connects to the specified MQTT broker and maintains the connection in the background. +- [ ] The app can publish Meshtastic node information/messages to the broker. +- [ ] The app can receive and process incoming Meshtastic payloads from subscribed topics. +- [ ] Unit tests cover at least 80% of the new MQTT client logic. + +## Out of Scope +- Direct firmware updates via MQTT (if not natively supported by the standard payload). +- Implementing a full local MQTT broker on the device. \ No newline at end of file diff --git a/core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttJsonPayload.kt b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttJsonPayload.kt new file mode 100644 index 000000000..e6a6929c0 --- /dev/null +++ b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttJsonPayload.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.model + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Serializable +data class MqttJsonPayload( + val type: String, + val from: Long, + val to: Long? = null, + val channel: Int? = null, + val payload: String? = null, + @SerialName("hop_limit") val hopLimit: Int? = null, + val id: Long? = null, + val time: Long? = null, + val sender: String? = null, + // Add other fields as needed for position/telemetry +) diff --git a/core/network/build.gradle.kts b/core/network/build.gradle.kts index a499f3644..689371b00 100644 --- a/core/network/build.gradle.kts +++ b/core/network/build.gradle.kts @@ -41,6 +41,8 @@ kotlin { implementation(projects.core.proto) implementation(libs.okio) + implementation(libs.kmqtt.client) + implementation(libs.kmqtt.common) implementation(libs.kotlinx.serialization.json) implementation(libs.ktor.client.core) implementation(libs.ktor.client.content.negotiation) @@ -58,7 +60,6 @@ kotlin { androidMain.dependencies { implementation(projects.core.ble) implementation(projects.core.prefs) - implementation(libs.org.eclipse.paho.client.mqttv3) implementation(libs.usb.serial.android) implementation(libs.coil.network.okhttp) implementation(libs.coil.svg) diff --git a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt deleted file mode 100644 index d9589eb0a..000000000 --- a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Copyright (c) 2025-2026 Meshtastic LLC - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.meshtastic.core.network.repository - -import co.touchlab.kermit.Logger -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.callbackFlow -import kotlinx.coroutines.flow.first -import okio.ByteString.Companion.toByteString -import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken -import org.eclipse.paho.client.mqttv3.MqttAsyncClient -import org.eclipse.paho.client.mqttv3.MqttAsyncClient.generateClientId -import org.eclipse.paho.client.mqttv3.MqttCallbackExtended -import org.eclipse.paho.client.mqttv3.MqttConnectOptions -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence -import org.koin.core.annotation.Single -import org.meshtastic.core.common.util.ignoreException -import org.meshtastic.core.model.util.subscribeList -import org.meshtastic.core.repository.NodeRepository -import org.meshtastic.core.repository.RadioConfigRepository -import org.meshtastic.proto.MqttClientProxyMessage -import java.net.URI -import java.security.SecureRandom -import javax.net.ssl.SSLContext -import javax.net.ssl.TrustManager - -@Single -class MQTTRepositoryImpl -constructor( - private val radioConfigRepository: RadioConfigRepository, - private val nodeRepository: NodeRepository, -) : MQTTRepository { - - companion object { - /** - * Quality of Service (QoS) levels in MQTT: - * - QoS 0: "at most once". Packets are sent once without validation if it has been received. - * - QoS 1: "at least once". Packets are sent and stored until the client receives confirmation from the server. - * MQTT ensures delivery, but duplicates may occur. - * - QoS 2: "exactly once". Similar to QoS 1, but with no duplicates. - */ - private const val DEFAULT_QOS = 1 - private const val DEFAULT_TOPIC_ROOT = "msh" - private const val DEFAULT_TOPIC_LEVEL = "/2/e/" - private const val JSON_TOPIC_LEVEL = "/2/json/" - private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org" - } - - private var mqttClient: MqttAsyncClient? = null - - override fun disconnect() { - Logger.i { "MQTT Disconnected" } - mqttClient?.apply { - if (isConnected) { - ignoreException { disconnect() } - } - ignoreException { close(true) } - } - mqttClient = null - } - - override val proxyMessageFlow: Flow = callbackFlow { - val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: generateClientId()}" - val channelSet = radioConfigRepository.channelSetFlow.first() - val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt - - val sslContext = SSLContext.getInstance("TLS") - // Create a custom SSLContext that trusts all certificates - sslContext.init(null, arrayOf(TrustAllX509TrustManager()), SecureRandom()) - - val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT } - - val connectOptions = - MqttConnectOptions().apply { - userName = mqttConfig?.username - password = mqttConfig?.password?.toCharArray() - isAutomaticReconnect = true - if (mqttConfig?.tls_enabled == true) { - socketFactory = sslContext.socketFactory - } - } - - @Suppress("MagicNumber") - val bufferOptions = - DisconnectedBufferOptions().apply { - isBufferEnabled = true - bufferSize = 512 - isPersistBuffer = false - isDeleteOldestMessages = true - } - - val callback = - object : MqttCallbackExtended { - override fun connectComplete(reconnect: Boolean, serverURI: String) { - Logger.i { "MQTT connectComplete: $serverURI reconnect: $reconnect" } - channelSet.subscribeList - .ifEmpty { - return - } - .forEach { globalId -> - subscribe("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+") - if (mqttConfig?.json_enabled == true) subscribe("$rootTopic$JSON_TOPIC_LEVEL$globalId/+") - } - subscribe("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+") - } - - override fun connectionLost(cause: Throwable) { - Logger.i { "MQTT connectionLost cause: $cause" } - if (cause is IllegalArgumentException) close(cause) - } - - override fun messageArrived(topic: String, message: MqttMessage) { - trySend( - MqttClientProxyMessage( - topic = topic, - data_ = message.payload.toByteString(), - retained = message.isRetained, - ), - ) - } - - override fun deliveryComplete(token: IMqttDeliveryToken?) { - Logger.i { "MQTT deliveryComplete messageId: ${token?.messageId}" } - } - } - - val scheme = if (mqttConfig?.tls_enabled == true) "ssl" else "tcp" - val (host, port) = - (mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let { - it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1) - } - - mqttClient = - MqttAsyncClient(URI(scheme, null, host, port, "", "", "").toString(), ownerId, MemoryPersistence()).apply { - setCallback(callback) - setBufferOpts(bufferOptions) - connect(connectOptions) - } - - awaitClose { disconnect() } - } - - private fun subscribe(topic: String) { - mqttClient?.subscribe(topic, DEFAULT_QOS) - Logger.i { "MQTT Subscribed to topic: $topic" } - } - - @Suppress("TooGenericExceptionCaught") - override fun publish(topic: String, data: ByteArray, retained: Boolean) { - try { - val token = mqttClient?.publish(topic, data, DEFAULT_QOS, retained) - Logger.i { "MQTT Publish messageId: ${token?.messageId}" } - } catch (ex: Exception) { - if (ex.message?.contains("Client is disconnected") == true) { - Logger.w { "MQTT Publish skipped: Client is disconnected" } - } else { - Logger.e(ex) { "MQTT Publish error: ${ex.message}" } - } - } - } -} diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt new file mode 100644 index 000000000..e6711f9db --- /dev/null +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2025-2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.network.repository + +import co.touchlab.kermit.Logger +import io.github.davidepianca98.MQTTClient +import io.github.davidepianca98.mqtt.MQTTVersion +import io.github.davidepianca98.mqtt.Subscription +import io.github.davidepianca98.mqtt.packets.Qos +import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions +import io.github.davidepianca98.socket.tls.TLSClientSettings +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.launch +import kotlinx.serialization.json.Json +import okio.ByteString.Companion.toByteString +import org.koin.core.annotation.Single +import org.meshtastic.core.model.MqttJsonPayload +import org.meshtastic.core.model.util.subscribeList +import org.meshtastic.core.repository.NodeRepository +import org.meshtastic.core.repository.RadioConfigRepository +import org.meshtastic.proto.MqttClientProxyMessage + +@Single(binds = [MQTTRepository::class]) +class MQTTRepositoryImpl( + private val radioConfigRepository: RadioConfigRepository, + private val nodeRepository: NodeRepository, +) : MQTTRepository { + + companion object { + private const val DEFAULT_TOPIC_ROOT = "msh" + private const val DEFAULT_TOPIC_LEVEL = "/2/e/" + private const val JSON_TOPIC_LEVEL = "/2/json/" + private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org" + } + + private var client: MQTTClient? = null + private val json = Json { ignoreUnknownKeys = true } + private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) + private var clientJob: Job? = null + + override fun disconnect() { + Logger.i { "MQTT Disconnecting" } + clientJob?.cancel() + clientJob = null + client = null + } + + @OptIn(ExperimentalUnsignedTypes::class) + override val proxyMessageFlow: Flow = callbackFlow { + val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}" + val channelSet = radioConfigRepository.channelSetFlow.first() + val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt + + val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT } ?: DEFAULT_TOPIC_ROOT + + val (host, port) = + (mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let { + it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (mqttConfig?.tls_enabled == true) 8883 else 1883) + } + + val newClient = + MQTTClient( + mqttVersion = MQTTVersion.MQTT5, + address = host, + port = port, + tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null, + userName = mqttConfig?.username, + password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(), + clientId = ownerId, + publishReceived = { packet -> + val topic = packet.topicName + val payload = packet.payload?.toByteArray() + Logger.d { "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" } + + if (topic.contains("/json/")) { + try { + val jsonStr = payload?.decodeToString() ?: "" + // Validate JSON by parsing it + json.decodeFromString(jsonStr) + Logger.d { "MQTT parsed JSON payload successfully" } + + trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = packet.retain)) + } catch (e: kotlinx.serialization.SerializationException) { + Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } + } catch (e: IllegalArgumentException) { + Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } + } + } else { + trySend( + MqttClientProxyMessage( + topic = topic, + data_ = payload?.toByteString() ?: okio.ByteString.EMPTY, + retained = packet.retain, + ), + ) + } + }, + ) + + client = newClient + + clientJob = + scope.launch { + try { + Logger.i { "MQTT Starting client loop for $host:$port" } + newClient.runSuspend() + } catch (e: io.github.davidepianca98.mqtt.MQTTException) { + Logger.e(e) { "MQTT Client loop error (MQTT)" } + close(e) + } catch (e: io.github.davidepianca98.socket.IOException) { + Logger.e(e) { "MQTT Client loop error (IO)" } + close(e) + } catch (e: kotlinx.coroutines.CancellationException) { + Logger.i { "MQTT Client loop cancelled" } + throw e + } + } + + // Subscriptions + val subscriptions = mutableListOf() + channelSet.subscribeList.forEach { globalId -> + subscriptions.add( + Subscription("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)), + ) + if (mqttConfig?.json_enabled == true) { + subscriptions.add( + Subscription("$rootTopic$JSON_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)), + ) + } + } + subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE))) + + if (subscriptions.isNotEmpty()) { + Logger.d { "MQTT subscribing to ${subscriptions.size} topics" } + newClient.subscribe(subscriptions) + } + + awaitClose { disconnect() } + } + + @OptIn(ExperimentalUnsignedTypes::class) + override fun publish(topic: String, data: ByteArray, retained: Boolean) { + Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" } + client?.publish(retain = retained, qos = Qos.AT_LEAST_ONCE, topic = topic, payload = data.toUByteArray()) + } +} diff --git a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt new file mode 100644 index 000000000..446b1a8b3 --- /dev/null +++ b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.network.repository + +import kotlinx.serialization.json.Json +import org.meshtastic.core.model.MqttJsonPayload +import kotlin.test.Test +import kotlin.test.assertEquals + +class MQTTRepositoryImplTest { + + @Test + fun `test address parsing logic`() { + val address1 = "mqtt.example.com:1883" + val (host1, port1) = address1.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) } + assertEquals("mqtt.example.com", host1) + assertEquals(1883, port1) + + val address2 = "mqtt.example.com" + val (host2, port2) = address2.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) } + assertEquals("mqtt.example.com", host2) + assertEquals(1883, port2) + } + + @Test + fun `test json payload parsing`() { + val jsonStr = + """{"type":"text","from":12345678,"to":4294967295,"payload":"Hello World","hop_limit":3,"id":123,"time":1600000000}""" + val json = Json { ignoreUnknownKeys = true } + val payload = json.decodeFromString(jsonStr) + + assertEquals("text", payload.type) + assertEquals(12345678L, payload.from) + assertEquals(4294967295L, payload.to) + assertEquals("Hello World", payload.payload) + assertEquals(3, payload.hopLimit) + assertEquals(123L, payload.id) + assertEquals(1600000000L, payload.time) + } + + @Test + fun `test json payload serialization`() { + val payload = + MqttJsonPayload( + type = "text", + from = 12345678, + to = 4294967295, + payload = "Hello World", + hopLimit = 3, + id = 123, + time = 1600000000, + ) + val json = Json { ignoreUnknownKeys = true } + val jsonStr = json.encodeToString(MqttJsonPayload.serializer(), payload) + + assert(jsonStr.contains("\"type\":\"text\"")) + assert(jsonStr.contains("\"from\":12345678")) + assert(jsonStr.contains("\"payload\":\"Hello World\"")) + } +} diff --git a/docs/decisions/architecture-review-2026-03.md b/docs/decisions/architecture-review-2026-03.md index fbad97ebd..c98a2137e 100644 --- a/docs/decisions/architecture-review-2026-03.md +++ b/docs/decisions/architecture-review-2026-03.md @@ -109,16 +109,12 @@ Formerly found in 3 prefs files: **Outcome:** These caches now use `AtomicRef>` helpers in `commonMain`, eliminating the last `ConcurrentHashMap` usage from shared prefs code. -### B3. MQTT is Android-only +### B3. MQTT (Resolved) -`MQTTRepositoryImpl` in `core:network/androidMain` uses Eclipse Paho (Java-only). Desktop and future iOS stub it. +`MQTTRepositoryImpl` has been migrated to `commonMain` using KMQTT, replacing Eclipse Paho. -**Fix:** Evaluate KMP MQTT options: -- `mqtt-kmp` library -- Ktor WebSocket-based MQTT -- `hivemq-mqtt-client` (JVM-only, acceptable for `jvmAndroidMain`) - -Short-term: Move to `jvmAndroidMain` if using a JVM-compatible lib. Long-term: Full KMP MQTT in `commonMain`. +**Fix:** Completed. +- `kmqtt` library integrated for full KMP support. ### B4. Vico charts *(resolved)* diff --git a/docs/roadmap.md b/docs/roadmap.md index 0dd6adc5e..4cc50e3e4 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -54,7 +54,7 @@ here| **Migrate to JetBrains Compose Multiplatform dependencies** | High | Low | |---|---|---| | TCP | Desktop (JVM) | ✅ Done — shared `StreamFrameCodec` + `TcpTransport` in `core:network` | | Serial/USB | Desktop (JVM) | ✅ Done — jSerialComm | -| MQTT | All (KMP) | ❌ Planned — Ktor/MQTT (currently Android-only via Eclipse Paho) | +| MQTT | All (KMP) | ✅ Completed — KMQTT in commonMain | | BLE | Android | ✅ Done — Kable | | BLE | Desktop | ✅ Done — Kable (JVM) | | BLE | iOS | ❌ Future — Kable/CoreBluetooth | @@ -93,7 +93,7 @@ here| **Migrate to JetBrains Compose Multiplatform dependencies** | High | Low | - ✅ **Done:** Extracted service, worker, and radio files from `app` to `core:service/androidMain` and `core:network/androidMain`. - **Next:** Extract remaining Android-specific files (e.g., Navigation files, App Widgets, message queues, and root Activity logic) out of `:app` to establish a truly thin app module. 2. ✅ **Done:** **Serial/USB transport** — direct radio connection on Desktop via jSerialComm -3. **MQTT transport** — cloud relay operation (KMP, benefits all targets) +3. **MQTT transport** — cloud relay operation (KMP, benefits all targets) ✅ 4. **Evaluate KMP-native testing tools** — Evaluate `Mokkery` or `Mockative` to replace `mockk` in `commonMain` of `core:testing` for iOS readiness. Integrate `Turbine` for shared `Flow` testing. 5. **Desktop ViewModel auto-wiring** — ✅ Done: ensured Koin K2 Compiler Plugin generates ViewModel modules for JVM target; eliminated manual wiring in `DesktopKoinModule` 5. **KMP charting** — ✅ Done: Vico charts migrated to `feature:node/commonMain` using KMP artifacts; desktop wires them directly diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4089a1151..fc11b2d2c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -66,6 +66,7 @@ vico = "3.0.3" dependency-guard = "0.5.0" kable = "0.42.0" nordic-dfu = "2.11.0" +kmqtt = "1.0.0" [libraries] @@ -218,8 +219,9 @@ material = { module = "com.google.android.material:material", version = "1.13.0" nordic-dfu = { module = "no.nordicsemi.android:dfu", version.ref = "nordic-dfu" } kable-core = { module = "com.juul.kable:kable-core", version.ref = "kable" } +kmqtt-client = { module = "io.github.davidepianca98:kmqtt-client", version.ref = "kmqtt" } +kmqtt-common = { module = "io.github.davidepianca98:kmqtt-common", version.ref = "kmqtt" } -org-eclipse-paho-client-mqttv3 = { module = "org.eclipse.paho:org.eclipse.paho.client.mqttv3", version = "1.2.5" } jserialcomm = { module = "com.fazecast:jSerialComm", version.ref = "jserialcomm" } okio = { module = "com.squareup.okio:okio", version.ref = "okio" } osmbonuspack = { module = "com.github.MKergall:osmbonuspack", version = "6.9.0" }