From 172680fd46c4fc8a2c8fe2e7c6e935d3ace2da9c Mon Sep 17 00:00:00 2001 From: James Rich <2199651+jamesarich@users.noreply.github.com> Date: Sat, 11 Apr 2026 18:38:33 -0500 Subject: [PATCH] fix(mqtt): replace yield() with proper connection readiness signal (#5073) --- .../network/repository/MQTTRepositoryImpl.kt | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt index 56d70d453..6be47c8eb 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -145,6 +145,30 @@ class MQTTRepositoryImpl( client = newClient + // Subscribe before starting the event loop. KMQTT's subscribe() calls send(), + // which queues the SUBSCRIBE packet in pendingSendMessages while connackReceived + // is false. Once the event loop receives CONNACK, it flushes the queue — so + // subscriptions are guaranteed to be sent immediately after the connection is + // established, with no timing races. This replaces a previous yield()-based + // approach that was unreliable on lightly loaded dispatchers. + val subscriptions = mutableListOf() + channelSet.subscribeList.forEach { globalId -> + subscriptions.add( + Subscription("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)), + ) + if (mqttConfig?.json_enabled == true) { + subscriptions.add( + Subscription("$rootTopic$JSON_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)), + ) + } + } + subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE))) + + if (subscriptions.isNotEmpty()) { + Logger.d { "MQTT subscribing to ${subscriptions.size} topics" } + newClient.subscribe(subscriptions) + } + clientJob = scope.launch { var reconnectDelay = INITIAL_RECONNECT_DELAY_MS @@ -170,30 +194,6 @@ class MQTTRepositoryImpl( } } - // Subscriptions: placed after runSuspend is launched and has had time to establish - // the TCP connection. KMQTT's subscribe() queues internally, but subscribing before - // the connection is ready may silently drop subscriptions depending on the version. - // A brief yield gives runSuspend() time to connect before we subscribe. - kotlinx.coroutines.yield() - - val subscriptions = mutableListOf() - channelSet.subscribeList.forEach { globalId -> - subscriptions.add( - Subscription("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)), - ) - if (mqttConfig?.json_enabled == true) { - subscriptions.add( - Subscription("$rootTopic$JSON_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)), - ) - } - } - subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE))) - - if (subscriptions.isNotEmpty()) { - Logger.d { "MQTT subscribing to ${subscriptions.size} topics" } - newClient.subscribe(subscriptions) - } - awaitClose { disconnect() } }