mirror of
https://github.com/meshtastic/Meshtastic-Android.git
synced 2026-04-14 10:58:59 -04:00
fix(mqtt): replace yield() with proper connection readiness signal (#5073)
This commit is contained in:
@@ -145,6 +145,30 @@ class MQTTRepositoryImpl(
|
||||
|
||||
client = newClient
|
||||
|
||||
// Subscribe before starting the event loop. KMQTT's subscribe() calls send(),
|
||||
// which queues the SUBSCRIBE packet in pendingSendMessages while connackReceived
|
||||
// is false. Once the event loop receives CONNACK, it flushes the queue — so
|
||||
// subscriptions are guaranteed to be sent immediately after the connection is
|
||||
// established, with no timing races. This replaces a previous yield()-based
|
||||
// approach that was unreliable on lightly loaded dispatchers.
|
||||
val subscriptions = mutableListOf<Subscription>()
|
||||
channelSet.subscribeList.forEach { globalId ->
|
||||
subscriptions.add(
|
||||
Subscription("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
|
||||
)
|
||||
if (mqttConfig?.json_enabled == true) {
|
||||
subscriptions.add(
|
||||
Subscription("$rootTopic$JSON_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
|
||||
)
|
||||
}
|
||||
}
|
||||
subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)))
|
||||
|
||||
if (subscriptions.isNotEmpty()) {
|
||||
Logger.d { "MQTT subscribing to ${subscriptions.size} topics" }
|
||||
newClient.subscribe(subscriptions)
|
||||
}
|
||||
|
||||
clientJob =
|
||||
scope.launch {
|
||||
var reconnectDelay = INITIAL_RECONNECT_DELAY_MS
|
||||
@@ -170,30 +194,6 @@ class MQTTRepositoryImpl(
|
||||
}
|
||||
}
|
||||
|
||||
// Subscriptions: placed after runSuspend is launched and has had time to establish
|
||||
// the TCP connection. KMQTT's subscribe() queues internally, but subscribing before
|
||||
// the connection is ready may silently drop subscriptions depending on the version.
|
||||
// A brief yield gives runSuspend() time to connect before we subscribe.
|
||||
kotlinx.coroutines.yield()
|
||||
|
||||
val subscriptions = mutableListOf<Subscription>()
|
||||
channelSet.subscribeList.forEach { globalId ->
|
||||
subscriptions.add(
|
||||
Subscription("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
|
||||
)
|
||||
if (mqttConfig?.json_enabled == true) {
|
||||
subscriptions.add(
|
||||
Subscription("$rootTopic$JSON_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
|
||||
)
|
||||
}
|
||||
}
|
||||
subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)))
|
||||
|
||||
if (subscriptions.isNotEmpty()) {
|
||||
Logger.d { "MQTT subscribing to ${subscriptions.size} topics" }
|
||||
newClient.subscribe(subscriptions)
|
||||
}
|
||||
|
||||
awaitClose { disconnect() }
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user