diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MqttManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MqttManagerImpl.kt index b928e8505..9940db706 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MqttManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MqttManagerImpl.kt @@ -20,15 +20,23 @@ import co.touchlab.kermit.Logger import co.touchlab.kermit.Severity import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.stateIn import org.koin.core.annotation.Named import org.koin.core.annotation.Single +import org.meshtastic.core.model.MqttConnectionState import org.meshtastic.core.network.repository.MQTTRepository import org.meshtastic.core.repository.MqttManager import org.meshtastic.core.repository.PacketHandler import org.meshtastic.core.repository.ServiceRepository +import org.meshtastic.mqtt.ConnectionState +import org.meshtastic.mqtt.MqttException import org.meshtastic.proto.MqttClientProxyMessage import org.meshtastic.proto.ToRadio @@ -40,18 +48,30 @@ class MqttManagerImpl( @Named("ServiceScope") private val scope: CoroutineScope, ) : MqttManager { private var mqttMessageFlow: Job? = null + private val proxyActive = MutableStateFlow(false) + + override val mqttConnectionState: StateFlow = + combine(proxyActive, mqttRepository.connectionState) { active, libState -> + if (!active) MqttConnectionState.INACTIVE else libState.toAppState() + } + .stateIn(scope, SharingStarted.Eagerly, MqttConnectionState.INACTIVE) override fun startProxy(enabled: Boolean, proxyToClientEnabled: Boolean) { if (mqttMessageFlow?.isActive == true) return if (enabled && proxyToClientEnabled) { + proxyActive.value = true mqttMessageFlow = mqttRepository.proxyMessageFlow .onEach { message -> packetHandler.sendToRadio(ToRadio(mqttClientProxyMessage = message)) } .catch { throwable -> - serviceRepository.setErrorMessage( - text = "MqttClientProxy failed: $throwable", - severity = Severity.Warn, - ) + proxyActive.value = false + val message = + when (throwable) { + is MqttException.ConnectionRejected -> "MQTT: connection rejected (check credentials)" + is MqttException.ConnectionLost -> "MQTT: connection lost" + else -> "MQTT proxy failed: ${throwable.message}" + } + serviceRepository.setErrorMessage(text = message, severity = Severity.Warn) } .launchIn(scope) } @@ -63,6 +83,7 @@ class MqttManagerImpl( mqttMessageFlow?.cancel() mqttMessageFlow = null } + proxyActive.value = false } override fun handleMqttProxyMessage(message: MqttClientProxyMessage) { @@ -79,4 +100,11 @@ class MqttManagerImpl( else -> {} } } + + private fun ConnectionState.toAppState(): MqttConnectionState = when (this) { + ConnectionState.DISCONNECTED -> MqttConnectionState.DISCONNECTED + ConnectionState.CONNECTING -> MqttConnectionState.CONNECTING + ConnectionState.CONNECTED -> MqttConnectionState.CONNECTED + ConnectionState.RECONNECTING -> MqttConnectionState.RECONNECTING + } } diff --git a/core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttConnectionState.kt b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttConnectionState.kt new file mode 100644 index 000000000..6a5b9ad15 --- /dev/null +++ b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttConnectionState.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2025-2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.model + +/** App-level MQTT proxy connection state, decoupled from the MQTT library's internal type. */ +enum class MqttConnectionState { + /** The MQTT proxy has not been started (disabled or not yet initialized). */ + INACTIVE, + + /** The MQTT client is not connected to the broker. */ + DISCONNECTED, + + /** The MQTT client is actively connecting to the broker. */ + CONNECTING, + + /** The MQTT client is connected and subscribed to topics. */ + CONNECTED, + + /** The MQTT client lost connection and is attempting to reconnect. */ + RECONNECTING, +} diff --git a/core/network/build.gradle.kts b/core/network/build.gradle.kts index c3dc2ffd5..f2fb85d7f 100644 --- a/core/network/build.gradle.kts +++ b/core/network/build.gradle.kts @@ -40,8 +40,7 @@ kotlin { implementation(projects.core.ble) implementation(libs.okio) - implementation(libs.kmqtt.client) - implementation(libs.kmqtt.common) + api(libs.meshtastic.mqtt.client) implementation(libs.kotlinx.serialization.json) implementation(libs.ktor.client.core) implementation(libs.ktor.client.content.negotiation) diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepository.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepository.kt index fe092fd7c..9efb9150b 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepository.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepository.kt @@ -17,6 +17,8 @@ package org.meshtastic.core.network.repository import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.StateFlow +import org.meshtastic.mqtt.ConnectionState import org.meshtastic.proto.MqttClientProxyMessage /** Interface defining the MQTT interactions used for proxying messages to and from the mesh. */ @@ -38,4 +40,7 @@ interface MQTTRepository { * @param retained Whether the message should be retained by the broker. */ fun publish(topic: String, data: ByteArray, retained: Boolean) + + /** Observable MQTT connection lifecycle state (DISCONNECTED → CONNECTING → CONNECTED → RECONNECTING). */ + val connectionState: StateFlow } diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt index 5e4ffa91d..94ab7f0ce 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -17,22 +17,15 @@ package org.meshtastic.core.network.repository import co.touchlab.kermit.Logger -import io.github.davidepianca98.MQTTClient -import io.github.davidepianca98.mqtt.MQTTException -import io.github.davidepianca98.mqtt.MQTTVersion -import io.github.davidepianca98.mqtt.Subscription -import io.github.davidepianca98.mqtt.packets.Qos -import io.github.davidepianca98.mqtt.packets.mqttv5.ReasonCode -import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions -import io.github.davidepianca98.socket.IOException -import io.github.davidepianca98.socket.tls.TLSClientSettings -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.channels.ProducerScope import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch @@ -44,11 +37,19 @@ import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonDecodingException import okio.ByteString.Companion.toByteString import org.koin.core.annotation.Single +import org.meshtastic.core.common.util.safeCatching import org.meshtastic.core.di.CoroutineDispatchers import org.meshtastic.core.model.MqttJsonPayload import org.meshtastic.core.model.util.subscribeList import org.meshtastic.core.repository.NodeRepository import org.meshtastic.core.repository.RadioConfigRepository +import org.meshtastic.mqtt.ConnectionState +import org.meshtastic.mqtt.MqttClient +import org.meshtastic.mqtt.MqttEndpoint +import org.meshtastic.mqtt.MqttException +import org.meshtastic.mqtt.MqttMessage +import org.meshtastic.mqtt.QoS +import org.meshtastic.mqtt.packet.Subscription import org.meshtastic.proto.MqttClientProxyMessage import kotlin.concurrent.Volatile @@ -64,12 +65,17 @@ class MQTTRepositoryImpl( private const val DEFAULT_TOPIC_LEVEL = "/2/e/" private const val JSON_TOPIC_LEVEL = "/2/json/" private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org" + private const val WEBSOCKET_PATH = "/mqtt" + private const val KEEPALIVE_SECONDS = 30 private const val INITIAL_RECONNECT_DELAY_MS = 1000L private const val MAX_RECONNECT_DELAY_MS = 30_000L private const val RECONNECT_BACKOFF_MULTIPLIER = 2 } - @Volatile private var client: MQTTClient? = null + @Volatile private var client: MqttClient? = null + + private val _connectionState = MutableStateFlow(ConnectionState.DISCONNECTED) + override val connectionState: StateFlow = _connectionState.asStateFlow() @OptIn(ExperimentalSerializationApi::class) private val json = Json { @@ -77,25 +83,17 @@ class MQTTRepositoryImpl( exceptionsWithDebugInfo = false } private val scope = CoroutineScope(dispatchers.default + SupervisorJob()) - - @Volatile private var clientJob: Job? = null private val publishSemaphore = Semaphore(20) - @Suppress("TooGenericExceptionCaught") override fun disconnect() { Logger.i { "MQTT Disconnecting" } val c = client - client = null // Null first to prevent re-entrant disconnect - try { - c?.disconnect(ReasonCode.SUCCESS) - } catch (e: Exception) { - Logger.w(e) { "MQTT clean disconnect failed" } - } - clientJob?.cancel() - clientJob = null + client = null + _connectionState.value = ConnectionState.DISCONNECTED + scope.launch { safeCatching { c?.close() }.onFailure { e -> Logger.w(e) { "MQTT clean disconnect failed" } } } } - @OptIn(ExperimentalUnsignedTypes::class) + @OptIn(ExperimentalSerializationApi::class) override val proxyMessageFlow: Flow = callbackFlow { val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}" val channelSet = radioConfigRepository.channelSetFlow.first() @@ -103,108 +101,112 @@ class MQTTRepositoryImpl( val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT } ?: DEFAULT_TOPIC_ROOT - val (host, port) = - (mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let { - it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (mqttConfig?.tls_enabled == true) 8883 else 1883) + val rawAddress = mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS + val endpoint = + if (rawAddress.contains("://")) { + MqttEndpoint.parse(rawAddress) + } else { + // Use WebSocket transport on all platforms for firewall/CDN compatibility. + val scheme = if (mqttConfig?.tls_enabled == true) "wss" else "ws" + MqttEndpoint.parse("$scheme://$rawAddress$WEBSOCKET_PATH") } val newClient = - MQTTClient( - mqttVersion = MQTTVersion.MQTT5, - address = host, - port = port, - tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null, - userName = mqttConfig?.username, - password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(), - clientId = ownerId, - publishReceived = { packet -> - val topic = packet.topicName - val payload = packet.payload?.toByteArray() - Logger.d { "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" } - - if (topic.contains("/json/")) { - try { - val jsonStr = payload?.decodeToString() ?: "" - // Validate JSON by parsing it - json.decodeFromString(jsonStr) - Logger.d { "MQTT parsed JSON payload successfully" } - - trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = packet.retain)) - } catch (e: JsonDecodingException) { - @OptIn(ExperimentalSerializationApi::class) - Logger.e(e) { "Failed to parse MQTT JSON: ${e.shortMessage} (path: ${e.path})" } - } catch (e: SerializationException) { - Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } - } catch (e: IllegalArgumentException) { - Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } - } - } else { - trySend( - MqttClientProxyMessage( - topic = topic, - data_ = payload?.toByteString() ?: okio.ByteString.EMPTY, - retained = packet.retain, - ), - ) - } - }, - ) - + MqttClient(ownerId) { + keepAliveSeconds = KEEPALIVE_SECONDS + autoReconnect = true + username = mqttConfig?.username + mqttConfig?.password?.let { password(it) } + } client = newClient - // Subscribe before starting the event loop. KMQTT's subscribe() calls send(), - // which queues the SUBSCRIBE packet in pendingSendMessages while connackReceived - // is false. Once the event loop receives CONNACK, it flushes the queue — so - // subscriptions are guaranteed to be sent immediately after the connection is - // established, with no timing races. This replaces a previous yield()-based - // approach that was unreliable on lightly loaded dispatchers. - val subscriptions = mutableListOf() - channelSet.subscribeList.forEach { globalId -> - subscriptions.add( - Subscription("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)), - ) - if (mqttConfig?.json_enabled == true) { - subscriptions.add( - Subscription("$rootTopic$JSON_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)), + val subscriptions: List = buildList { + channelSet.subscribeList.forEach { globalId -> + add( + Subscription( + "$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", + maxQos = QoS.AT_LEAST_ONCE, + noLocal = true, + ), ) - } - } - subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE))) - - if (subscriptions.isNotEmpty()) { - Logger.d { "MQTT subscribing to ${subscriptions.size} topics" } - newClient.subscribe(subscriptions) - } - - clientJob = - scope.launch { - var reconnectDelay = INITIAL_RECONNECT_DELAY_MS - while (true) { - try { - Logger.i { "MQTT Starting client loop for $host:$port" } - newClient.runSuspend() - // runSuspend returned normally — broker closed connection cleanly. - // Reset backoff so the next reconnect starts with the minimum delay. - reconnectDelay = INITIAL_RECONNECT_DELAY_MS - Logger.w { "MQTT client loop ended normally, reconnecting in ${reconnectDelay}ms" } - } catch (e: MQTTException) { - Logger.e(e) { "MQTT Client loop error (MQTT), reconnecting in ${reconnectDelay}ms" } - } catch (e: IOException) { - Logger.e(e) { "MQTT Client loop error (IO), reconnecting in ${reconnectDelay}ms" } - } catch (e: CancellationException) { - Logger.i { "MQTT Client loop cancelled" } - throw e - } - delay(reconnectDelay) - reconnectDelay = - (reconnectDelay * RECONNECT_BACKOFF_MULTIPLIER).coerceAtMost(MAX_RECONNECT_DELAY_MS) + if (mqttConfig?.json_enabled == true) { + add( + Subscription( + "$rootTopic$JSON_TOPIC_LEVEL$globalId/+", + maxQos = QoS.AT_LEAST_ONCE, + noLocal = true, + ), + ) } } + add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", maxQos = QoS.AT_LEAST_ONCE, noLocal = true)) + } + + // Collect from the SharedFlow before connecting to avoid missing retained messages + // that arrive immediately after SUBSCRIBE. + launch { newClient.messages.collect { msg -> processMessage(msg) } } + + // Forward the client's connection state to the repo-level StateFlow for UI observation. + launch { newClient.connectionState.collect { _connectionState.value = it } } + + // Retry the initial connect with exponential backoff. Once established, + // autoReconnect handles subsequent drops and re-subscribes internally. + launch { + var reconnectDelay = INITIAL_RECONNECT_DELAY_MS + while (true) { + val result = safeCatching { + Logger.i { "MQTT Connecting to $endpoint" } + newClient.connect(endpoint) + if (subscriptions.isNotEmpty()) { + Logger.d { "MQTT subscribing to ${subscriptions.size} topics" } + newClient.subscribe(subscriptions) + } + Logger.i { "MQTT connected and subscribed" } + } + when { + result.isSuccess -> return@launch + result.exceptionOrNull() is MqttException.ConnectionRejected -> { + Logger.e(result.exceptionOrNull()) { "MQTT connection rejected (unrecoverable), stopping" } + close(result.exceptionOrNull()!!) + return@launch + } + else -> { + Logger.e(result.exceptionOrNull()) { "MQTT connect failed, retrying in ${reconnectDelay}ms" } + delay(reconnectDelay) + reconnectDelay = + (reconnectDelay * RECONNECT_BACKOFF_MULTIPLIER).coerceAtMost(MAX_RECONNECT_DELAY_MS) + } + } + } + } awaitClose { disconnect() } } - @OptIn(ExperimentalUnsignedTypes::class) + @OptIn(ExperimentalSerializationApi::class) + private fun ProducerScope.processMessage(msg: MqttMessage) { + val topic = msg.topic + val payload = msg.payload.toByteArray() + Logger.d { "MQTT received message on topic $topic (size: ${payload.size} bytes)" } + + if (topic.contains("/json/")) { + try { + val jsonStr = payload.decodeToString() + json.decodeFromString(jsonStr) + Logger.d { "MQTT parsed JSON payload successfully" } + trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = msg.retain)) + } catch (e: JsonDecodingException) { + Logger.e(e) { "Failed to parse MQTT JSON: ${e.shortMessage} (path: ${e.path})" } + } catch (e: SerializationException) { + Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } + } catch (e: IllegalArgumentException) { + Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } + } + } else { + trySend(MqttClientProxyMessage(topic = topic, data_ = payload.toByteString(), retained = msg.retain)) + } + } + override fun publish(topic: String, data: ByteArray, retained: Boolean) { val currentClient = client if (currentClient == null) { @@ -214,17 +216,12 @@ class MQTTRepositoryImpl( Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" } scope.launch { publishSemaphore.withPermit { - @Suppress("TooGenericExceptionCaught") - try { + safeCatching { currentClient.publish( - retain = retained, - qos = Qos.AT_LEAST_ONCE, - topic = topic, - payload = data.toUByteArray(), + MqttMessage(topic = topic, payload = data, qos = QoS.AT_LEAST_ONCE, retain = retained), ) - } catch (e: Exception) { - Logger.w(e) { "MQTT publish to $topic failed" } } + .onFailure { e -> Logger.w(e) { "MQTT publish to $topic failed" } } } } } diff --git a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/MqttManager.kt b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/MqttManager.kt index 7ebfa0521..d91ae7080 100644 --- a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/MqttManager.kt +++ b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/MqttManager.kt @@ -16,10 +16,15 @@ */ package org.meshtastic.core.repository +import kotlinx.coroutines.flow.StateFlow +import org.meshtastic.core.model.MqttConnectionState import org.meshtastic.proto.MqttClientProxyMessage /** Interface for managing MQTT proxy communication. */ interface MqttManager { + /** Observable MQTT proxy connection state for UI consumption. */ + val mqttConnectionState: StateFlow + /** Starts the MQTT proxy with the given settings. */ fun startProxy(enabled: Boolean, proxyToClientEnabled: Boolean) diff --git a/core/resources/src/commonMain/composeResources/values/strings.xml b/core/resources/src/commonMain/composeResources/values/strings.xml index 4748844c6..9bd1b68de 100644 --- a/core/resources/src/commonMain/composeResources/values/strings.xml +++ b/core/resources/src/commonMain/composeResources/values/strings.xml @@ -638,6 +638,11 @@ Ignore MQTT Ok to MQTT MQTT Config + Inactive + Disconnected + Connecting… + Connected + Reconnecting… MQTT enabled Address Username diff --git a/desktop/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt b/desktop/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt index 985a76987..f366d821b 100644 --- a/desktop/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt +++ b/desktop/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt @@ -44,6 +44,7 @@ import org.meshtastic.core.repository.PlatformAnalytics import org.meshtastic.core.repository.RadioInterfaceService import org.meshtastic.core.repository.ServiceBroadcasts import org.meshtastic.proto.MqttClientProxyMessage +import org.meshtastic.mqtt.ConnectionState as MqttConnectionState import org.meshtastic.proto.Position as ProtoPosition /** @@ -162,6 +163,8 @@ class NoopMQTTRepository : MQTTRepository { override val proxyMessageFlow: Flow = emptyFlow() override fun publish(topic: String, data: ByteArray, retained: Boolean) {} + + override val connectionState = MutableStateFlow(MqttConnectionState.DISCONNECTED) } // endregion diff --git a/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModel.kt b/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModel.kt index 7a946b78b..e443a3f75 100644 --- a/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModel.kt +++ b/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModel.kt @@ -43,6 +43,7 @@ import org.meshtastic.core.domain.usecase.settings.RadioResponseResult import org.meshtastic.core.domain.usecase.settings.ToggleAnalyticsUseCase import org.meshtastic.core.domain.usecase.settings.ToggleHomoglyphEncodingUseCase import org.meshtastic.core.model.ConnectionState +import org.meshtastic.core.model.MqttConnectionState import org.meshtastic.core.model.MyNodeInfo import org.meshtastic.core.model.Node import org.meshtastic.core.model.Position @@ -52,6 +53,7 @@ import org.meshtastic.core.repository.HomoglyphPrefs import org.meshtastic.core.repository.LocationRepository import org.meshtastic.core.repository.LocationService import org.meshtastic.core.repository.MapConsentPrefs +import org.meshtastic.core.repository.MqttManager import org.meshtastic.core.repository.NodeRepository import org.meshtastic.core.repository.PacketRepository import org.meshtastic.core.repository.RadioConfigRepository @@ -125,6 +127,7 @@ open class RadioConfigViewModel( private val processRadioResponseUseCase: ProcessRadioResponseUseCase, private val locationService: LocationService, private val fileService: FileService, + private val mqttManager: MqttManager, ) : ViewModel() { val analyticsAllowedFlow = analyticsPrefs.analyticsAllowed @@ -138,6 +141,9 @@ open class RadioConfigViewModel( toggleHomoglyphEncodingUseCase() } + /** MQTT proxy connection state for the settings UI. */ + val mqttConnectionState: StateFlow = mqttManager.mqttConnectionState + private val destNumFlow = MutableStateFlow(savedStateHandle.get("destNum")) fun initDestNum(id: Int?) { diff --git a/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/component/MQTTConfigItemList.kt b/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/component/MQTTConfigItemList.kt index 0427f9520..972a9d43f 100644 --- a/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/component/MQTTConfigItemList.kt +++ b/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/component/MQTTConfigItemList.kt @@ -18,17 +18,32 @@ package org.meshtastic.feature.settings.radio.component +import androidx.compose.foundation.background +import androidx.compose.foundation.layout.Arrangement +import androidx.compose.foundation.layout.Box +import androidx.compose.foundation.layout.Row +import androidx.compose.foundation.layout.padding +import androidx.compose.foundation.layout.size +import androidx.compose.foundation.shape.CircleShape import androidx.compose.foundation.text.KeyboardActions import androidx.compose.foundation.text.KeyboardOptions import androidx.compose.material3.CardDefaults import androidx.compose.material3.HorizontalDivider +import androidx.compose.material3.MaterialTheme +import androidx.compose.material3.Text import androidx.compose.runtime.Composable import androidx.compose.runtime.getValue +import androidx.compose.ui.Alignment +import androidx.compose.ui.Modifier +import androidx.compose.ui.draw.clip +import androidx.compose.ui.graphics.Color import androidx.compose.ui.platform.LocalFocusManager import androidx.compose.ui.text.input.ImeAction import androidx.compose.ui.text.input.KeyboardType +import androidx.compose.ui.unit.dp import androidx.lifecycle.compose.collectAsStateWithLifecycle import org.jetbrains.compose.resources.stringResource +import org.meshtastic.core.model.MqttConnectionState import org.meshtastic.core.resources.Res import org.meshtastic.core.resources.address import org.meshtastic.core.resources.default_mqtt_address @@ -38,6 +53,11 @@ import org.meshtastic.core.resources.map_reporting import org.meshtastic.core.resources.mqtt import org.meshtastic.core.resources.mqtt_config import org.meshtastic.core.resources.mqtt_enabled +import org.meshtastic.core.resources.mqtt_status_connected +import org.meshtastic.core.resources.mqtt_status_connecting +import org.meshtastic.core.resources.mqtt_status_disconnected +import org.meshtastic.core.resources.mqtt_status_inactive +import org.meshtastic.core.resources.mqtt_status_reconnecting import org.meshtastic.core.resources.password import org.meshtastic.core.resources.proxy_to_client_enabled import org.meshtastic.core.resources.root_topic @@ -54,6 +74,7 @@ import org.meshtastic.proto.ModuleConfig fun MQTTConfigScreen(viewModel: RadioConfigViewModel, onBack: () -> Unit) { val state by viewModel.radioConfigState.collectAsStateWithLifecycle() val destNode by viewModel.destNode.collectAsStateWithLifecycle() + val mqttProxyState by viewModel.mqttConnectionState.collectAsStateWithLifecycle() val destNum = destNode?.num val mqttConfig = state.moduleConfig.mqtt ?: ModuleConfig.MQTTConfig() val formState = rememberConfigState(initialValue = mqttConfig) @@ -86,6 +107,8 @@ fun MQTTConfigScreen(viewModel: RadioConfigViewModel, onBack: () -> Unit) { viewModel.setModuleConfig(config) }, ) { + item { MqttStatusRow(mqttProxyState) } + item { TitledCard(title = stringResource(Res.string.mqtt_config)) { SwitchPreference( @@ -210,3 +233,32 @@ fun MQTTConfigScreen(viewModel: RadioConfigViewModel, onBack: () -> Unit) { } private const val MIN_INTERVAL_SECS = 3600 + +private val AmberColor = Color(0xFFFFA000) +private val GreenColor = Color(0xFF4CAF50) + +@Composable +private fun MqttStatusRow(state: MqttConnectionState) { + val (label, color) = + when (state) { + MqttConnectionState.INACTIVE -> + stringResource(Res.string.mqtt_status_inactive) to MaterialTheme.colorScheme.outline + MqttConnectionState.DISCONNECTED -> + stringResource(Res.string.mqtt_status_disconnected) to MaterialTheme.colorScheme.error + MqttConnectionState.CONNECTING -> stringResource(Res.string.mqtt_status_connecting) to AmberColor + MqttConnectionState.CONNECTED -> stringResource(Res.string.mqtt_status_connected) to GreenColor + MqttConnectionState.RECONNECTING -> stringResource(Res.string.mqtt_status_reconnecting) to AmberColor + } + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(8.dp), + modifier = Modifier.padding(horizontal = 4.dp), + ) { + Box(modifier = Modifier.size(10.dp).clip(CircleShape).background(color)) + Text( + text = label, + style = MaterialTheme.typography.labelLarge, + color = MaterialTheme.colorScheme.onSurfaceVariant, + ) + } +} diff --git a/feature/settings/src/commonTest/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModelTest.kt b/feature/settings/src/commonTest/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModelTest.kt index 167daebbf..6e11f6b92 100644 --- a/feature/settings/src/commonTest/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModelTest.kt +++ b/feature/settings/src/commonTest/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModelTest.kt @@ -53,6 +53,7 @@ import org.meshtastic.core.repository.HomoglyphPrefs import org.meshtastic.core.repository.LocationRepository import org.meshtastic.core.repository.LocationService import org.meshtastic.core.repository.MapConsentPrefs +import org.meshtastic.core.repository.MqttManager import org.meshtastic.core.repository.PacketRepository import org.meshtastic.core.repository.RadioConfigRepository import org.meshtastic.core.repository.ServiceRepository @@ -99,6 +100,7 @@ class RadioConfigViewModelTest { private val processRadioResponseUseCase: ProcessRadioResponseUseCase = mock(MockMode.autofill) private val locationService: LocationService = mock(MockMode.autofill) private val fileService: FileService = mock(MockMode.autofill) + private val mqttManager: MqttManager = mock(MockMode.autofill) private val uiPrefs: UiPrefs = mock(MockMode.autofill) private lateinit var viewModel: RadioConfigViewModel @@ -121,6 +123,9 @@ class RadioConfigViewModelTest { every { serviceRepository.connectionState } returns MutableStateFlow(org.meshtastic.core.model.ConnectionState.Connected) + every { mqttManager.mqttConnectionState } returns + MutableStateFlow(org.meshtastic.core.model.MqttConnectionState.INACTIVE) + every { uiPrefs.showQuickChat } returns MutableStateFlow(false) viewModel = createViewModel() @@ -152,6 +157,7 @@ class RadioConfigViewModelTest { processRadioResponseUseCase = processRadioResponseUseCase, locationService = locationService, fileService = fileService, + mqttManager = mqttManager, ) @Test diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5098b40b2..6308bd2f6 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -74,7 +74,7 @@ spotless = "8.4.0" wire = "6.2.0" vico = "3.1.0" kable = "0.42.0" -kmqtt = "1.0.0" +mqttastic = "0.1.0" jmdns = "3.6.3" qrcode-kotlin = "4.5.0" @@ -222,8 +222,7 @@ markdown-renderer-android = { module = "com.mikepenz:multiplatform-markdown-rend material = { module = "com.google.android.material:material", version = "1.13.0" } kable-core = { module = "com.juul.kable:kable-core", version.ref = "kable" } -kmqtt-client = { module = "io.github.davidepianca98:kmqtt-client", version.ref = "kmqtt" } -kmqtt-common = { module = "io.github.davidepianca98:kmqtt-common", version.ref = "kmqtt" } +meshtastic-mqtt-client = { module = "org.meshtastic:mqtt-client", version.ref = "mqttastic" } jserialcomm = { module = "com.fazecast:jSerialComm", version.ref = "jserialcomm" } okio = { module = "com.squareup.okio:okio", version.ref = "okio" }