From d314ee2d8a3cb3a7d2867fc91dde2d066eb4a019 Mon Sep 17 00:00:00 2001
From: James Rich <2199651+jamesarich@users.noreply.github.com>
Date: Wed, 18 Mar 2026 13:39:20 -0500
Subject: [PATCH] feat: mqtt (#4841)
---
app/build.gradle.kts | 1 -
app/proguard-rules.pro | 3 -
.../org/meshtastic/app/di/NetworkModule.kt | 5 -
conductor/product.md | 2 +-
conductor/tech-stack.md | 1 +
conductor/tracks.md | 5 +
.../tracks/mqtt_transport_20260318/index.md | 5 +
.../mqtt_transport_20260318/metadata.json | 8 +
.../tracks/mqtt_transport_20260318/plan.md | 32 ++++
.../tracks/mqtt_transport_20260318/spec.md | 33 ++++
.../meshtastic/core/model/MqttJsonPayload.kt | 34 ++++
core/network/build.gradle.kts | 3 +-
.../network/repository/MQTTRepositoryImpl.kt | 178 ------------------
.../network/repository/MQTTRepositoryImpl.kt | 167 ++++++++++++++++
.../repository/MQTTRepositoryImplTest.kt | 74 ++++++++
docs/decisions/architecture-review-2026-03.md | 12 +-
docs/roadmap.md | 4 +-
gradle/libs.versions.toml | 4 +-
18 files changed, 371 insertions(+), 200 deletions(-)
create mode 100644 conductor/tracks/mqtt_transport_20260318/index.md
create mode 100644 conductor/tracks/mqtt_transport_20260318/metadata.json
create mode 100644 conductor/tracks/mqtt_transport_20260318/plan.md
create mode 100644 conductor/tracks/mqtt_transport_20260318/spec.md
create mode 100644 core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttJsonPayload.kt
delete mode 100644 core/network/src/androidMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt
create mode 100644 core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt
create mode 100644 core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt
diff --git a/app/build.gradle.kts b/app/build.gradle.kts
index 0b9bc8e35..151d44624 100644
--- a/app/build.gradle.kts
+++ b/app/build.gradle.kts
@@ -260,7 +260,6 @@ dependencies {
implementation(libs.androidx.core.splashscreen)
implementation(libs.kotlinx.serialization.json)
implementation(libs.okhttp3.logging.interceptor)
- implementation(libs.org.eclipse.paho.client.mqttv3)
implementation(libs.usb.serial.android)
implementation(libs.androidx.work.runtime.ktx)
implementation(libs.koin.android)
diff --git a/app/proguard-rules.pro b/app/proguard-rules.pro
index cc6a76518..d885aee0a 100644
--- a/app/proguard-rules.pro
+++ b/app/proguard-rules.pro
@@ -24,9 +24,6 @@
-keep class com.google.protobuf.** { *; }
-keep class org.meshtastic.proto.** { *; }
-# eclipse.paho.client
--keep class org.eclipse.paho.client.mqttv3.logging.JSR47Logger { *; }
-
# OkHttp
-dontwarn okhttp3.internal.platform.**
-dontwarn org.conscrypt.**
diff --git a/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt b/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt
index 58416a139..7178f7426 100644
--- a/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt
+++ b/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt
@@ -52,11 +52,6 @@ class NetworkModule {
fun provideNsdManager(application: Application): NsdManager =
application.getSystemService(Context.NSD_SERVICE) as NsdManager
- @Single
- fun bindMqttRepository(
- impl: org.meshtastic.core.network.repository.MQTTRepositoryImpl,
- ): org.meshtastic.core.network.repository.MQTTRepository = impl
-
@Single
fun provideImageLoader(
okHttpClient: OkHttpClient,
diff --git a/conductor/product.md b/conductor/product.md
index ccbd0a648..2c8a9f086 100644
--- a/conductor/product.md
+++ b/conductor/product.md
@@ -12,7 +12,7 @@ Meshtastic-Android is a Kotlin Multiplatform (KMP) application designed to facil
- Emergency response and disaster relief teams
## Core Features
-- Direct communication with Meshtastic hardware (via BLE, USB, TCP)
+- Direct communication with Meshtastic hardware (via BLE, USB, TCP, MQTT)
- Decentralized text messaging across the mesh network
- Unified cross-platform notifications for messages and node events
- Adaptive node and contact management
diff --git a/conductor/tech-stack.md b/conductor/tech-stack.md
index eb3244a32..ca55ace24 100644
--- a/conductor/tech-stack.md
+++ b/conductor/tech-stack.md
@@ -25,6 +25,7 @@
- **Ktor:** Multiplatform HTTP client for web services and TCP streaming.
- **Kable:** Multiplatform BLE library used as the primary BLE transport for all targets (Android, Desktop, and future iOS).
- **jSerialComm:** Cross-platform Java library used for direct Serial/USB communication with Meshtastic devices on the Desktop (JVM) target.
+- **KMQTT:** Kotlin Multiplatform MQTT client and broker used for MQTT transport, replacing the Android-only Paho library.
- **Coroutines & Flows:** For asynchronous programming and state management.
## Testing (KMP)
diff --git a/conductor/tracks.md b/conductor/tracks.md
index 22d3d6494..702f67e68 100644
--- a/conductor/tracks.md
+++ b/conductor/tracks.md
@@ -1,3 +1,8 @@
# Project Tracks
This file tracks all major tracks for the project. Each track has its own detailed plan in its respective folder.
+
+---
+
+- [x] **Track: MQTT transport**
+*Link: [./tracks/mqtt_transport_20260318/](./tracks/mqtt_transport_20260318/)*
\ No newline at end of file
diff --git a/conductor/tracks/mqtt_transport_20260318/index.md b/conductor/tracks/mqtt_transport_20260318/index.md
new file mode 100644
index 000000000..8f255c832
--- /dev/null
+++ b/conductor/tracks/mqtt_transport_20260318/index.md
@@ -0,0 +1,5 @@
+# Track mqtt_transport_20260318 Context
+
+- [Specification](./spec.md)
+- [Implementation Plan](./plan.md)
+- [Metadata](./metadata.json)
\ No newline at end of file
diff --git a/conductor/tracks/mqtt_transport_20260318/metadata.json b/conductor/tracks/mqtt_transport_20260318/metadata.json
new file mode 100644
index 000000000..bd7d32747
--- /dev/null
+++ b/conductor/tracks/mqtt_transport_20260318/metadata.json
@@ -0,0 +1,8 @@
+{
+ "track_id": "mqtt_transport_20260318",
+ "type": "feature",
+ "status": "new",
+ "created_at": "2026-03-18T00:00:00Z",
+ "updated_at": "2026-03-18T00:00:00Z",
+ "description": "MQTT transport"
+}
\ No newline at end of file
diff --git a/conductor/tracks/mqtt_transport_20260318/plan.md b/conductor/tracks/mqtt_transport_20260318/plan.md
new file mode 100644
index 000000000..5788491c1
--- /dev/null
+++ b/conductor/tracks/mqtt_transport_20260318/plan.md
@@ -0,0 +1,32 @@
+# Implementation Plan: MQTT Transport
+
+## Phase 1: Core Networking & Library Integration
+- [x] Task: Evaluate and add KMP MQTT library dependency (e.g. Kmqtt) to `core:network` or `libs.versions.toml`. [2a4aa35]
+ - [x] Add dependency to `libs.versions.toml`.
+ - [x] Apply dependency in `core:network/build.gradle.kts`.
+- [x] Task: Implement `MqttTransport` class in `commonMain` of `core:network`. [99d35b3]
+ - [x] Create failing tests in `commonTest` for MqttTransport initialization and configuration parsing.
+ - [x] Implement MqttTransport to parse URL (mqtt://, mqtts://), credentials, and configure the underlying MQTT client.
+ - [x] Write failing tests for connection state flows.
+ - [x] Implement connection lifecycle handling (connect, disconnect, reconnect).
+- [x] Task: Conductor - User Manual Verification 'Phase 1: Core Networking & Library Integration' (Protocol in workflow.md) [93d9a50]
+
+## Phase 2: Publishing & Subscribing
+- [x] Task: Implement message subscription and payload parsing. [4900f69]
+ - [x] Create failing tests for receiving and mapping standard Meshtastic JSON payloads from subscribed topics.
+ - [x] Implement topic subscription management in `MqttTransport`.
+ - [x] Implement payload parsing and integration with `core:model` definitions.
+- [x] Task: Implement publishing mechanism. [0991210]
+ - [x] Create failing tests for formatting and publishing node information/messages to custom topics.
+ - [x] Implement publish functionality in `MqttTransport`.
+- [x] Task: Conductor - User Manual Verification 'Phase 2: Publishing & Subscribing' (Protocol in workflow.md) [7418e53]
+
+## Phase 3: Service & UI Integration
+- [x] Task: Integrate `MqttTransport` into `core:service` and `core:data`. [d414556, e172f53]
+ - [x] Create failing tests for orchestrating MQTT connection based on user preferences.
+ - [x] Implement service-level bindings to maintain background connection.
+- [x] Task: Implement MQTT UI Configuration Settings. (Verified existing implementation)
+ - [x] Verified existing `MQTTConfigItemList.kt` correctly manages UI inputs.
+ - [x] Verified MQTT broker URL, username, password, and custom topic inputs exist in UI.
+ - [x] Verified UI inputs correctly wire to `ModuleConfig.MQTTConfig` used by `MQTTRepositoryImpl`.
+- [x] Task: Conductor - User Manual Verification 'Phase 3: Service & UI Integration' (Protocol in workflow.md) [deaa324]
\ No newline at end of file
diff --git a/conductor/tracks/mqtt_transport_20260318/spec.md b/conductor/tracks/mqtt_transport_20260318/spec.md
new file mode 100644
index 000000000..e1e213646
--- /dev/null
+++ b/conductor/tracks/mqtt_transport_20260318/spec.md
@@ -0,0 +1,33 @@
+# Specification: MQTT Transport
+
+## Overview
+Implement an MQTT transport layer for the Meshtastic-Android Kotlin Multiplatform (KMP) application to enable communication with Meshtastic devices over MQTT. This will support Android, Desktop, iOS, and potentially Web platforms in the future.
+
+## Functional Requirements
+- **Platforms:** Ensure the MQTT transport operates correctly across Android, Desktop, and iOS platforms, using KMP best practices (with considerations for Web compatibility if technically feasible).
+- **Core Library:** Utilize a dedicated Kotlin Multiplatform MQTT client library (e.g., Kmqtt) within the `core:network` module.
+- **Connection Features:**
+ - Support for both standard (`mqtt://`) and secure TLS/SSL (`mqtts://`) connections.
+ - Support for username and password authentication.
+- **Messaging Features:**
+ - Subscribe to and publish on user-defined custom topics.
+ - Parse and serialize standard Meshtastic JSON payloads.
+- **UI Integration:**
+ - Follow the existing Android UX patterns for network/device connections.
+ - Integrate MQTT configuration seamlessly into the connection or advanced settings menus.
+
+## Non-Functional Requirements
+- **Architecture:** Business logic for MQTT communication must reside in the `core:network` (or a new `core:mqtt`) `commonMain` source set.
+- **Testability:** Implement shared tests in `commonTest` to verify connection states, topic parsing, and payload serialization without relying on JVM-specific mocks.
+- **Performance:** Ensure background execution and resource management align with the `core:service` architecture.
+
+## Acceptance Criteria
+- [ ] Users can enter an MQTT broker URL (including TLS), username, and password in the UI.
+- [ ] The app successfully connects to the specified MQTT broker and maintains the connection in the background.
+- [ ] The app can publish Meshtastic node information/messages to the broker.
+- [ ] The app can receive and process incoming Meshtastic payloads from subscribed topics.
+- [ ] Unit tests cover at least 80% of the new MQTT client logic.
+
+## Out of Scope
+- Direct firmware updates via MQTT (if not natively supported by the standard payload).
+- Implementing a full local MQTT broker on the device.
\ No newline at end of file
diff --git a/core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttJsonPayload.kt b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttJsonPayload.kt
new file mode 100644
index 000000000..e6a6929c0
--- /dev/null
+++ b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttJsonPayload.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2026 Meshtastic LLC
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.meshtastic.core.model
+
+import kotlinx.serialization.SerialName
+import kotlinx.serialization.Serializable
+
+@Serializable
+data class MqttJsonPayload(
+ val type: String,
+ val from: Long,
+ val to: Long? = null,
+ val channel: Int? = null,
+ val payload: String? = null,
+ @SerialName("hop_limit") val hopLimit: Int? = null,
+ val id: Long? = null,
+ val time: Long? = null,
+ val sender: String? = null,
+ // Add other fields as needed for position/telemetry
+)
diff --git a/core/network/build.gradle.kts b/core/network/build.gradle.kts
index a499f3644..689371b00 100644
--- a/core/network/build.gradle.kts
+++ b/core/network/build.gradle.kts
@@ -41,6 +41,8 @@ kotlin {
implementation(projects.core.proto)
implementation(libs.okio)
+ implementation(libs.kmqtt.client)
+ implementation(libs.kmqtt.common)
implementation(libs.kotlinx.serialization.json)
implementation(libs.ktor.client.core)
implementation(libs.ktor.client.content.negotiation)
@@ -58,7 +60,6 @@ kotlin {
androidMain.dependencies {
implementation(projects.core.ble)
implementation(projects.core.prefs)
- implementation(libs.org.eclipse.paho.client.mqttv3)
implementation(libs.usb.serial.android)
implementation(libs.coil.network.okhttp)
implementation(libs.coil.svg)
diff --git a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt
deleted file mode 100644
index d9589eb0a..000000000
--- a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Copyright (c) 2025-2026 Meshtastic LLC
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-package org.meshtastic.core.network.repository
-
-import co.touchlab.kermit.Logger
-import kotlinx.coroutines.channels.awaitClose
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.callbackFlow
-import kotlinx.coroutines.flow.first
-import okio.ByteString.Companion.toByteString
-import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttAsyncClient
-import org.eclipse.paho.client.mqttv3.MqttAsyncClient.generateClientId
-import org.eclipse.paho.client.mqttv3.MqttCallbackExtended
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.koin.core.annotation.Single
-import org.meshtastic.core.common.util.ignoreException
-import org.meshtastic.core.model.util.subscribeList
-import org.meshtastic.core.repository.NodeRepository
-import org.meshtastic.core.repository.RadioConfigRepository
-import org.meshtastic.proto.MqttClientProxyMessage
-import java.net.URI
-import java.security.SecureRandom
-import javax.net.ssl.SSLContext
-import javax.net.ssl.TrustManager
-
-@Single
-class MQTTRepositoryImpl
-constructor(
- private val radioConfigRepository: RadioConfigRepository,
- private val nodeRepository: NodeRepository,
-) : MQTTRepository {
-
- companion object {
- /**
- * Quality of Service (QoS) levels in MQTT:
- * - QoS 0: "at most once". Packets are sent once without validation if it has been received.
- * - QoS 1: "at least once". Packets are sent and stored until the client receives confirmation from the server.
- * MQTT ensures delivery, but duplicates may occur.
- * - QoS 2: "exactly once". Similar to QoS 1, but with no duplicates.
- */
- private const val DEFAULT_QOS = 1
- private const val DEFAULT_TOPIC_ROOT = "msh"
- private const val DEFAULT_TOPIC_LEVEL = "/2/e/"
- private const val JSON_TOPIC_LEVEL = "/2/json/"
- private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
- }
-
- private var mqttClient: MqttAsyncClient? = null
-
- override fun disconnect() {
- Logger.i { "MQTT Disconnected" }
- mqttClient?.apply {
- if (isConnected) {
- ignoreException { disconnect() }
- }
- ignoreException { close(true) }
- }
- mqttClient = null
- }
-
- override val proxyMessageFlow: Flow = callbackFlow {
- val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: generateClientId()}"
- val channelSet = radioConfigRepository.channelSetFlow.first()
- val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
-
- val sslContext = SSLContext.getInstance("TLS")
- // Create a custom SSLContext that trusts all certificates
- sslContext.init(null, arrayOf(TrustAllX509TrustManager()), SecureRandom())
-
- val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT }
-
- val connectOptions =
- MqttConnectOptions().apply {
- userName = mqttConfig?.username
- password = mqttConfig?.password?.toCharArray()
- isAutomaticReconnect = true
- if (mqttConfig?.tls_enabled == true) {
- socketFactory = sslContext.socketFactory
- }
- }
-
- @Suppress("MagicNumber")
- val bufferOptions =
- DisconnectedBufferOptions().apply {
- isBufferEnabled = true
- bufferSize = 512
- isPersistBuffer = false
- isDeleteOldestMessages = true
- }
-
- val callback =
- object : MqttCallbackExtended {
- override fun connectComplete(reconnect: Boolean, serverURI: String) {
- Logger.i { "MQTT connectComplete: $serverURI reconnect: $reconnect" }
- channelSet.subscribeList
- .ifEmpty {
- return
- }
- .forEach { globalId ->
- subscribe("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+")
- if (mqttConfig?.json_enabled == true) subscribe("$rootTopic$JSON_TOPIC_LEVEL$globalId/+")
- }
- subscribe("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+")
- }
-
- override fun connectionLost(cause: Throwable) {
- Logger.i { "MQTT connectionLost cause: $cause" }
- if (cause is IllegalArgumentException) close(cause)
- }
-
- override fun messageArrived(topic: String, message: MqttMessage) {
- trySend(
- MqttClientProxyMessage(
- topic = topic,
- data_ = message.payload.toByteString(),
- retained = message.isRetained,
- ),
- )
- }
-
- override fun deliveryComplete(token: IMqttDeliveryToken?) {
- Logger.i { "MQTT deliveryComplete messageId: ${token?.messageId}" }
- }
- }
-
- val scheme = if (mqttConfig?.tls_enabled == true) "ssl" else "tcp"
- val (host, port) =
- (mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let {
- it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1)
- }
-
- mqttClient =
- MqttAsyncClient(URI(scheme, null, host, port, "", "", "").toString(), ownerId, MemoryPersistence()).apply {
- setCallback(callback)
- setBufferOpts(bufferOptions)
- connect(connectOptions)
- }
-
- awaitClose { disconnect() }
- }
-
- private fun subscribe(topic: String) {
- mqttClient?.subscribe(topic, DEFAULT_QOS)
- Logger.i { "MQTT Subscribed to topic: $topic" }
- }
-
- @Suppress("TooGenericExceptionCaught")
- override fun publish(topic: String, data: ByteArray, retained: Boolean) {
- try {
- val token = mqttClient?.publish(topic, data, DEFAULT_QOS, retained)
- Logger.i { "MQTT Publish messageId: ${token?.messageId}" }
- } catch (ex: Exception) {
- if (ex.message?.contains("Client is disconnected") == true) {
- Logger.w { "MQTT Publish skipped: Client is disconnected" }
- } else {
- Logger.e(ex) { "MQTT Publish error: ${ex.message}" }
- }
- }
- }
-}
diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt
new file mode 100644
index 000000000..e6711f9db
--- /dev/null
+++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt
@@ -0,0 +1,167 @@
+/*
+ * Copyright (c) 2025-2026 Meshtastic LLC
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.meshtastic.core.network.repository
+
+import co.touchlab.kermit.Logger
+import io.github.davidepianca98.MQTTClient
+import io.github.davidepianca98.mqtt.MQTTVersion
+import io.github.davidepianca98.mqtt.Subscription
+import io.github.davidepianca98.mqtt.packets.Qos
+import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions
+import io.github.davidepianca98.socket.tls.TLSClientSettings
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.SupervisorJob
+import kotlinx.coroutines.channels.awaitClose
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.callbackFlow
+import kotlinx.coroutines.flow.first
+import kotlinx.coroutines.launch
+import kotlinx.serialization.json.Json
+import okio.ByteString.Companion.toByteString
+import org.koin.core.annotation.Single
+import org.meshtastic.core.model.MqttJsonPayload
+import org.meshtastic.core.model.util.subscribeList
+import org.meshtastic.core.repository.NodeRepository
+import org.meshtastic.core.repository.RadioConfigRepository
+import org.meshtastic.proto.MqttClientProxyMessage
+
+@Single(binds = [MQTTRepository::class])
+class MQTTRepositoryImpl(
+ private val radioConfigRepository: RadioConfigRepository,
+ private val nodeRepository: NodeRepository,
+) : MQTTRepository {
+
+ companion object {
+ private const val DEFAULT_TOPIC_ROOT = "msh"
+ private const val DEFAULT_TOPIC_LEVEL = "/2/e/"
+ private const val JSON_TOPIC_LEVEL = "/2/json/"
+ private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
+ }
+
+ private var client: MQTTClient? = null
+ private val json = Json { ignoreUnknownKeys = true }
+ private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
+ private var clientJob: Job? = null
+
+ override fun disconnect() {
+ Logger.i { "MQTT Disconnecting" }
+ clientJob?.cancel()
+ clientJob = null
+ client = null
+ }
+
+ @OptIn(ExperimentalUnsignedTypes::class)
+ override val proxyMessageFlow: Flow = callbackFlow {
+ val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}"
+ val channelSet = radioConfigRepository.channelSetFlow.first()
+ val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
+
+ val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT } ?: DEFAULT_TOPIC_ROOT
+
+ val (host, port) =
+ (mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let {
+ it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (mqttConfig?.tls_enabled == true) 8883 else 1883)
+ }
+
+ val newClient =
+ MQTTClient(
+ mqttVersion = MQTTVersion.MQTT5,
+ address = host,
+ port = port,
+ tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null,
+ userName = mqttConfig?.username,
+ password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(),
+ clientId = ownerId,
+ publishReceived = { packet ->
+ val topic = packet.topicName
+ val payload = packet.payload?.toByteArray()
+ Logger.d { "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" }
+
+ if (topic.contains("/json/")) {
+ try {
+ val jsonStr = payload?.decodeToString() ?: ""
+ // Validate JSON by parsing it
+ json.decodeFromString(jsonStr)
+ Logger.d { "MQTT parsed JSON payload successfully" }
+
+ trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = packet.retain))
+ } catch (e: kotlinx.serialization.SerializationException) {
+ Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
+ } catch (e: IllegalArgumentException) {
+ Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
+ }
+ } else {
+ trySend(
+ MqttClientProxyMessage(
+ topic = topic,
+ data_ = payload?.toByteString() ?: okio.ByteString.EMPTY,
+ retained = packet.retain,
+ ),
+ )
+ }
+ },
+ )
+
+ client = newClient
+
+ clientJob =
+ scope.launch {
+ try {
+ Logger.i { "MQTT Starting client loop for $host:$port" }
+ newClient.runSuspend()
+ } catch (e: io.github.davidepianca98.mqtt.MQTTException) {
+ Logger.e(e) { "MQTT Client loop error (MQTT)" }
+ close(e)
+ } catch (e: io.github.davidepianca98.socket.IOException) {
+ Logger.e(e) { "MQTT Client loop error (IO)" }
+ close(e)
+ } catch (e: kotlinx.coroutines.CancellationException) {
+ Logger.i { "MQTT Client loop cancelled" }
+ throw e
+ }
+ }
+
+ // Subscriptions
+ val subscriptions = mutableListOf()
+ channelSet.subscribeList.forEach { globalId ->
+ subscriptions.add(
+ Subscription("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
+ )
+ if (mqttConfig?.json_enabled == true) {
+ subscriptions.add(
+ Subscription("$rootTopic$JSON_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
+ )
+ }
+ }
+ subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)))
+
+ if (subscriptions.isNotEmpty()) {
+ Logger.d { "MQTT subscribing to ${subscriptions.size} topics" }
+ newClient.subscribe(subscriptions)
+ }
+
+ awaitClose { disconnect() }
+ }
+
+ @OptIn(ExperimentalUnsignedTypes::class)
+ override fun publish(topic: String, data: ByteArray, retained: Boolean) {
+ Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" }
+ client?.publish(retain = retained, qos = Qos.AT_LEAST_ONCE, topic = topic, payload = data.toUByteArray())
+ }
+}
diff --git a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt
new file mode 100644
index 000000000..446b1a8b3
--- /dev/null
+++ b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2026 Meshtastic LLC
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.meshtastic.core.network.repository
+
+import kotlinx.serialization.json.Json
+import org.meshtastic.core.model.MqttJsonPayload
+import kotlin.test.Test
+import kotlin.test.assertEquals
+
+class MQTTRepositoryImplTest {
+
+ @Test
+ fun `test address parsing logic`() {
+ val address1 = "mqtt.example.com:1883"
+ val (host1, port1) = address1.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) }
+ assertEquals("mqtt.example.com", host1)
+ assertEquals(1883, port1)
+
+ val address2 = "mqtt.example.com"
+ val (host2, port2) = address2.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) }
+ assertEquals("mqtt.example.com", host2)
+ assertEquals(1883, port2)
+ }
+
+ @Test
+ fun `test json payload parsing`() {
+ val jsonStr =
+ """{"type":"text","from":12345678,"to":4294967295,"payload":"Hello World","hop_limit":3,"id":123,"time":1600000000}"""
+ val json = Json { ignoreUnknownKeys = true }
+ val payload = json.decodeFromString(jsonStr)
+
+ assertEquals("text", payload.type)
+ assertEquals(12345678L, payload.from)
+ assertEquals(4294967295L, payload.to)
+ assertEquals("Hello World", payload.payload)
+ assertEquals(3, payload.hopLimit)
+ assertEquals(123L, payload.id)
+ assertEquals(1600000000L, payload.time)
+ }
+
+ @Test
+ fun `test json payload serialization`() {
+ val payload =
+ MqttJsonPayload(
+ type = "text",
+ from = 12345678,
+ to = 4294967295,
+ payload = "Hello World",
+ hopLimit = 3,
+ id = 123,
+ time = 1600000000,
+ )
+ val json = Json { ignoreUnknownKeys = true }
+ val jsonStr = json.encodeToString(MqttJsonPayload.serializer(), payload)
+
+ assert(jsonStr.contains("\"type\":\"text\""))
+ assert(jsonStr.contains("\"from\":12345678"))
+ assert(jsonStr.contains("\"payload\":\"Hello World\""))
+ }
+}
diff --git a/docs/decisions/architecture-review-2026-03.md b/docs/decisions/architecture-review-2026-03.md
index fbad97ebd..c98a2137e 100644
--- a/docs/decisions/architecture-review-2026-03.md
+++ b/docs/decisions/architecture-review-2026-03.md
@@ -109,16 +109,12 @@ Formerly found in 3 prefs files:
**Outcome:** These caches now use `AtomicRef>` helpers in `commonMain`, eliminating the last `ConcurrentHashMap` usage from shared prefs code.
-### B3. MQTT is Android-only
+### B3. MQTT (Resolved)
-`MQTTRepositoryImpl` in `core:network/androidMain` uses Eclipse Paho (Java-only). Desktop and future iOS stub it.
+`MQTTRepositoryImpl` has been migrated to `commonMain` using KMQTT, replacing Eclipse Paho.
-**Fix:** Evaluate KMP MQTT options:
-- `mqtt-kmp` library
-- Ktor WebSocket-based MQTT
-- `hivemq-mqtt-client` (JVM-only, acceptable for `jvmAndroidMain`)
-
-Short-term: Move to `jvmAndroidMain` if using a JVM-compatible lib. Long-term: Full KMP MQTT in `commonMain`.
+**Fix:** Completed.
+- `kmqtt` library integrated for full KMP support.
### B4. Vico charts *(resolved)*
diff --git a/docs/roadmap.md b/docs/roadmap.md
index 0dd6adc5e..4cc50e3e4 100644
--- a/docs/roadmap.md
+++ b/docs/roadmap.md
@@ -54,7 +54,7 @@ here| **Migrate to JetBrains Compose Multiplatform dependencies** | High | Low |
|---|---|---|
| TCP | Desktop (JVM) | ✅ Done — shared `StreamFrameCodec` + `TcpTransport` in `core:network` |
| Serial/USB | Desktop (JVM) | ✅ Done — jSerialComm |
-| MQTT | All (KMP) | ❌ Planned — Ktor/MQTT (currently Android-only via Eclipse Paho) |
+| MQTT | All (KMP) | ✅ Completed — KMQTT in commonMain |
| BLE | Android | ✅ Done — Kable |
| BLE | Desktop | ✅ Done — Kable (JVM) |
| BLE | iOS | ❌ Future — Kable/CoreBluetooth |
@@ -93,7 +93,7 @@ here| **Migrate to JetBrains Compose Multiplatform dependencies** | High | Low |
- ✅ **Done:** Extracted service, worker, and radio files from `app` to `core:service/androidMain` and `core:network/androidMain`.
- **Next:** Extract remaining Android-specific files (e.g., Navigation files, App Widgets, message queues, and root Activity logic) out of `:app` to establish a truly thin app module.
2. ✅ **Done:** **Serial/USB transport** — direct radio connection on Desktop via jSerialComm
-3. **MQTT transport** — cloud relay operation (KMP, benefits all targets)
+3. **MQTT transport** — cloud relay operation (KMP, benefits all targets) ✅
4. **Evaluate KMP-native testing tools** — Evaluate `Mokkery` or `Mockative` to replace `mockk` in `commonMain` of `core:testing` for iOS readiness. Integrate `Turbine` for shared `Flow` testing.
5. **Desktop ViewModel auto-wiring** — ✅ Done: ensured Koin K2 Compiler Plugin generates ViewModel modules for JVM target; eliminated manual wiring in `DesktopKoinModule`
5. **KMP charting** — ✅ Done: Vico charts migrated to `feature:node/commonMain` using KMP artifacts; desktop wires them directly
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 4089a1151..fc11b2d2c 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -66,6 +66,7 @@ vico = "3.0.3"
dependency-guard = "0.5.0"
kable = "0.42.0"
nordic-dfu = "2.11.0"
+kmqtt = "1.0.0"
[libraries]
@@ -218,8 +219,9 @@ material = { module = "com.google.android.material:material", version = "1.13.0"
nordic-dfu = { module = "no.nordicsemi.android:dfu", version.ref = "nordic-dfu" }
kable-core = { module = "com.juul.kable:kable-core", version.ref = "kable" }
+kmqtt-client = { module = "io.github.davidepianca98:kmqtt-client", version.ref = "kmqtt" }
+kmqtt-common = { module = "io.github.davidepianca98:kmqtt-common", version.ref = "kmqtt" }
-org-eclipse-paho-client-mqttv3 = { module = "org.eclipse.paho:org.eclipse.paho.client.mqttv3", version = "1.2.5" }
jserialcomm = { module = "com.fazecast:jSerialComm", version.ref = "jserialcomm" }
okio = { module = "com.squareup.okio:okio", version.ref = "okio" }
osmbonuspack = { module = "com.github.MKergall:osmbonuspack", version = "6.9.0" }