refactor(coroutines): migrate to kotlinx-coroutines 1.11.0-rc02 (#5312)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
James Rich
2026-04-30 22:11:22 -05:00
committed by GitHub
parent 2822290908
commit e198f52de5
21 changed files with 64 additions and 49 deletions

View File

@@ -33,11 +33,11 @@ import io.ktor.client.request.get
import io.ktor.client.statement.bodyAsChannel
import io.ktor.http.isSuccess
import io.ktor.utils.io.jvm.javaio.toInputStream
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.mapNotNull
@@ -135,7 +135,7 @@ class MapViewModel(
val theme: StateFlow<Int> = uiPrefs.theme
private val _errorFlow = MutableSharedFlow<String>()
val errorFlow: SharedFlow<String> = _errorFlow.asSharedFlow()
val errorFlow: Flow<String> = _errorFlow.asFlow()
val customTileProviderConfigs: StateFlow<List<CustomTileProviderConfig>> =
customTileProviderRepository.getCustomTileProviders().stateInWhileSubscribed(initialValue = emptyList())

View File

@@ -20,8 +20,10 @@ import co.touchlab.kermit.Logger
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.asDeferred
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.consumeAsFlow
@@ -260,7 +262,7 @@ class PacketHandlerImpl(
}
@Suppress("TooGenericExceptionCaught")
private suspend fun sendPacket(packet: MeshPacket): CompletableDeferred<Boolean> {
private suspend fun sendPacket(packet: MeshPacket): Deferred<Boolean> {
// Reuse a deferred pre-registered by sendToRadioAndAwait, or create a new one.
val deferred = responseMutex.withLock { queueResponse.getOrPut(packet.id) { CompletableDeferred() } }
try {
@@ -275,7 +277,10 @@ class PacketHandlerImpl(
Logger.e(ex) { "sendToRadio error: ${ex.message}" }
deferred.complete(false)
}
return deferred
// Return a read-only Deferred view (kotlinx.coroutines 1.11+) so callers can await it
// without being able to complete the underlying CompletableDeferred; cancellation is
// still exposed via Deferred/Job.
return deferred.asDeferred()
}
private fun insertMeshLog(packetToSave: MeshLog) {

View File

@@ -24,8 +24,7 @@ import kotlinx.collections.immutable.persistentMapOf
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
@@ -59,7 +58,7 @@ class SessionManagerImpl(private val clock: Clock) : SessionManager {
private val entries = atomic<PersistentMap<Int, SessionEntry>>(persistentMapOf())
private val refreshFlow = MutableSharedFlow<Int>(extraBufferCapacity = REFRESH_BUFFER)
override val sessionRefreshFlow: SharedFlow<Int> = refreshFlow.asSharedFlow()
override val sessionRefreshFlow: Flow<Int> = refreshFlow.asFlow()
override fun recordSession(srcNodeNum: Int, passkey: ByteString) {
if (passkey.size == 0) return

View File

@@ -19,7 +19,7 @@ package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asFlow
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.repository.PacketHandler
@@ -48,7 +48,7 @@ class XModemManagerImpl(private val packetHandler: PacketHandler) : XModemManage
extraBufferCapacity = 4,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
override val fileTransferFlow = _fileTransferFlow.asSharedFlow()
override val fileTransferFlow = _fileTransferFlow.asFlow()
// --- mutable state ---
// Thread-safety contract: [handleIncomingXModem] is called sequentially from

View File

@@ -182,6 +182,8 @@ class BleRadioTransport(
}
}
if (d != null) return d
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
Logger.v(e) { "[$address] Scan attempt failed or timed out" }
}
@@ -243,6 +245,8 @@ class BleRadioTransport(
try {
bluetoothRepository.bond(device)
Logger.i { "[$address] Bonding successful" }
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
Logger.w(e) { "[$address] Bonding failed, attempting connection anyway" }
}
@@ -301,6 +305,8 @@ class BleRadioTransport(
val rssi = retryBleOperation(tag = address) { device.readRssi() }
Logger.d { "[$address] Connection confirmed. Initial RSSI: $rssi dBm" }
}
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
Logger.w(e) { "[$address] Failed to read initial connection RSSI" }
}

View File

@@ -18,7 +18,6 @@ package org.meshtastic.core.repository
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.DeviceType
@@ -79,7 +78,7 @@ interface RadioInterfaceService : RadioTransportCallback {
val receivedData: Flow<ByteArray>
/** Flow of radio activity events. */
val meshActivity: SharedFlow<MeshActivity>
val meshActivity: Flow<MeshActivity>
/**
* Drains any bytes currently buffered in [receivedData] without emitting them to collectors.
@@ -112,7 +111,7 @@ interface RadioInterfaceService : RadioTransportCallback {
fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String
/** Flow of user-facing connection error messages (e.g. permission failures). */
val connectionError: SharedFlow<String>
val connectionError: Flow<String>
/** The scope in which interface-related coroutines should run. */
val serviceScope: CoroutineScope

View File

@@ -18,7 +18,6 @@ package org.meshtastic.core.repository
import co.touchlab.kermit.Severity
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.service.ServiceAction
@@ -121,9 +120,11 @@ interface ServiceRepository {
/**
* Flow of all raw [MeshPacket] objects received from the mesh.
*
* Subscribing to this flow allows components to react to any incoming traffic.
* Subscribing to this flow allows components to react to any incoming traffic. The underlying implementation may be
* backed by a hot shared flow, but this API intentionally exposes only the [Flow] interface. That implementation
* detail is hidden via [kotlinx.coroutines.flow.SharedFlow.asFlow] (kotlinx.coroutines 1.11+).
*/
val meshPacketFlow: SharedFlow<MeshPacket>
val meshPacketFlow: Flow<MeshPacket>
/**
* Emits a mesh packet into the flow.

View File

@@ -17,7 +17,6 @@
package org.meshtastic.core.repository
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import okio.ByteString
import org.meshtastic.core.model.SessionStatus
@@ -49,9 +48,10 @@ interface SessionManager {
* Used by `EnsureRemoteAdminSessionUseCase` to await a session refresh from a specific node without polling.
*
* Backed by a `MutableSharedFlow` with no replay; subscribers must subscribe **before** dispatching the request
* that triggers the refresh.
* that triggers the refresh. Exposed as a [Flow] via [kotlinx.coroutines.flow.SharedFlow.asFlow]
* (kotlinx.coroutines 1.11+) to hide the `SharedFlow` API from consumers while preserving hot-stream semantics.
*/
val sessionRefreshFlow: SharedFlow<Int>
val sessionRefreshFlow: Flow<Int>
/**
* Cold per-node [SessionStatus] flow. Emits the current status synchronously on subscription and re-emits whenever

View File

@@ -23,7 +23,7 @@ import co.touchlab.kermit.Severity
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asFlow
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
@@ -78,7 +78,7 @@ class ServiceBroadcastsTest {
override val errorMessage = MutableStateFlow<String?>(null)
override val connectionProgress = MutableStateFlow<String?>(null)
private val meshPackets = MutableSharedFlow<MeshPacket>()
override val meshPacketFlow: SharedFlow<MeshPacket> = meshPackets
override val meshPacketFlow: Flow<MeshPacket> = meshPackets.asFlow()
override val tracerouteResponse = MutableStateFlow<TracerouteResponse?>(null)
override val neighborInfoResponse = MutableStateFlow<String?>(null)
private val serviceActions = MutableSharedFlow<ServiceAction>()

View File

@@ -22,8 +22,8 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.receiveAsFlow
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.service.ServiceAction
@@ -88,8 +88,8 @@ open class ServiceRepositoryImpl : ServiceRepository {
}
private val _meshPacketFlow = MutableSharedFlow<MeshPacket>(extraBufferCapacity = 64)
override val meshPacketFlow: SharedFlow<MeshPacket>
get() = _meshPacketFlow
override val meshPacketFlow: Flow<MeshPacket>
get() = _meshPacketFlow.asFlow()
override suspend fun emitMeshPacket(packet: MeshPacket) {
_meshPacketFlow.emit(packet)

View File

@@ -30,9 +30,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
@@ -109,10 +108,10 @@ class SharedRadioInterfaceService(
private val _meshActivity =
MutableSharedFlow<MeshActivity>(extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)
override val meshActivity: SharedFlow<MeshActivity> = _meshActivity.asSharedFlow()
override val meshActivity: Flow<MeshActivity> = _meshActivity.asFlow()
private val _connectionError = MutableSharedFlow<String>(extraBufferCapacity = 64)
override val connectionError: SharedFlow<String> = _connectionError.asSharedFlow()
override val connectionError: Flow<String> = _connectionError.asFlow()
override val serviceScope: CoroutineScope
get() = _serviceScope

View File

@@ -20,11 +20,11 @@ import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
@@ -35,7 +35,7 @@ import org.meshtastic.core.model.Node
interface TAKServerManager {
val isRunning: StateFlow<Boolean>
val connectionCount: StateFlow<Int>
val inboundMessages: SharedFlow<CoTMessage>
val inboundMessages: Flow<CoTMessage>
/** Start the TAK server using [scope]. Port is fixed at [TAKServer] construction time. */
fun start(scope: CoroutineScope)
@@ -59,7 +59,7 @@ class TAKServerManagerImpl(private val takServer: TAKServer) : TAKServerManager
override val connectionCount: StateFlow<Int> = takServer.connectionCount
private val _inboundMessages = MutableSharedFlow<CoTMessage>()
override val inboundMessages: SharedFlow<CoTMessage> = _inboundMessages.asSharedFlow()
override val inboundMessages: Flow<CoTMessage> = _inboundMessages.asFlow()
// Unbounded channel preserves FIFO ordering of inbound CoT messages under load.
// onMessage is a non-suspend callback, so we trySend (always succeeds for UNLIMITED)

View File

@@ -22,8 +22,8 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.receiveAsFlow
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.DeviceType
@@ -57,10 +57,10 @@ class FakeRadioInterfaceService(override val serviceScope: CoroutineScope = Main
override val receivedData: Flow<ByteArray> = _receivedData.receiveAsFlow()
private val _meshActivity = MutableSharedFlow<MeshActivity>()
override val meshActivity: SharedFlow<MeshActivity> = _meshActivity
override val meshActivity: Flow<MeshActivity> = _meshActivity.asFlow()
private val _connectionError = MutableSharedFlow<String>()
override val connectionError: SharedFlow<String> = _connectionError
override val connectionError: Flow<String> = _connectionError.asFlow()
val sentToRadio = mutableListOf<ByteArray>()
var connectCalled = false

View File

@@ -20,8 +20,8 @@ import co.touchlab.kermit.Severity
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asFlow
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.service.ServiceAction
import org.meshtastic.core.model.service.TracerouteResponse
@@ -69,7 +69,7 @@ class FakeServiceRepository : ServiceRepository {
}
private val _meshPacketFlow = MutableSharedFlow<MeshPacket>()
override val meshPacketFlow: SharedFlow<MeshPacket> = _meshPacketFlow
override val meshPacketFlow: Flow<MeshPacket> = _meshPacketFlow.asFlow()
override suspend fun emitMeshPacket(packet: MeshPacket) {
_meshPacketFlow.emit(packet)

View File

@@ -25,6 +25,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.filterNotNull
@@ -133,7 +134,7 @@ class UIViewModel(
private val _scrollToTopEventFlow =
MutableSharedFlow<ScrollToTopEvent>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val scrollToTopEventFlow: Flow<ScrollToTopEvent> = _scrollToTopEventFlow.asSharedFlow()
val scrollToTopEventFlow: Flow<ScrollToTopEvent> = _scrollToTopEventFlow.asFlow()
fun emitScrollToTopEvent(event: ScrollToTopEvent) {
_scrollToTopEventFlow.tryEmit(event)

View File

@@ -129,7 +129,8 @@ fun safeLaunch(
}
/**
* Creates and returns a [MutableSharedFlow] intended for one-shot error events. Expose as `SharedFlow` via
* [asSharedFlow] in the ViewModel, and collect in the UI to show snackbars or toasts.
* Creates and returns a [MutableSharedFlow] intended for one-shot error events. Expose as `Flow` via
* [kotlinx.coroutines.flow.asFlow] in the ViewModel (hiding hot-flow semantics), and collect in the UI to show
* snackbars or toasts.
*/
fun errorEventFlow(): MutableSharedFlow<UiText> = MutableSharedFlow(extraBufferCapacity = 1)

View File

@@ -25,6 +25,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emptyFlow
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.DataPacket
@@ -74,8 +75,8 @@ class NoopRadioInterfaceService : RadioInterfaceService {
override fun isMockTransport(): Boolean = false
override val receivedData = MutableSharedFlow<ByteArray>()
override val meshActivity = MutableSharedFlow<MeshActivity>()
override val connectionError = MutableSharedFlow<String>()
override val meshActivity: Flow<MeshActivity> = MutableSharedFlow<MeshActivity>().asFlow()
override val connectionError: Flow<String> = MutableSharedFlow<String>().asFlow()
override fun sendToRadio(bytes: ByteArray) {
logWarn("NoopRadioInterfaceService.sendToRadio(${bytes.size} bytes)")

View File

@@ -20,6 +20,7 @@ import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.delay
@@ -32,6 +33,7 @@ import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import org.jetbrains.compose.resources.StringResource
import org.koin.core.annotation.KoinViewModel
@@ -127,10 +129,10 @@ class FirmwareUpdateViewModel(
override fun onCleared() {
super.onCleared()
// viewModelScope is already cancelled when onCleared() runs, so launch cleanup on the
// application-wide scope (SupervisorJob + ioDispatcher). NonCancellable keeps cleanup
// running even if something tries to cancel it mid-flight.
applicationScope.launch(NonCancellable) {
tempFirmwareFile = cleanupTemporaryFiles(fileHandler, tempFirmwareFile)
// application-wide scope (SupervisorJob + ioDispatcher). ATOMIC start + NonCancellable
// context keeps cleanup running even if something tries to cancel it mid-flight.
applicationScope.launch(start = CoroutineStart.ATOMIC) {
withContext(NonCancellable) { tempFirmwareFile = cleanupTemporaryFiles(fileHandler, tempFirmwareFile) }
}
}

View File

@@ -142,7 +142,7 @@ class LegacyDfuTransport(
// Best-effort DFU Version read — gate out unsupported old bootloaders (SDK ≤ 6).
val versionChar = service.characteristic(LEGACY_DFU_VERSION_UUID)
val version =
runCatching { service.read(versionChar) }
safeCatching { service.read(versionChar) }
.map { bytes ->
if (bytes.size >= 2) (bytes[0].toInt() and 0xFF) or ((bytes[1].toInt() and 0xFF) shl 8) else -1
}

View File

@@ -35,6 +35,7 @@ import org.jetbrains.compose.resources.StringResource
import org.koin.core.annotation.InjectedParam
import org.koin.core.annotation.KoinViewModel
import org.meshtastic.core.common.util.CommonUri
import org.meshtastic.core.common.util.safeCatching
import org.meshtastic.core.domain.usecase.settings.AdminActionsUseCase
import org.meshtastic.core.domain.usecase.settings.ExportProfileUseCase
import org.meshtastic.core.domain.usecase.settings.ExportSecurityConfigUseCase
@@ -165,7 +166,7 @@ open class RadioConfigViewModel(
probeJob =
viewModelScope.launch {
val result =
runCatching { mqttManager.probe(address, tlsEnabled, username, password) }
safeCatching { mqttManager.probe(address, tlsEnabled, username, password) }
.getOrElse { e ->
Logger.w(e) { "MQTT probe threw" }
MqttProbeStatus.Other(message = e.message)

View File

@@ -19,7 +19,7 @@ koin-plugin = "1.0.0-RC2"
# Kotlin
kotlin = "2.3.21"
kotlinx-coroutines-android = "1.10.2"
kotlinx-coroutines-android = "1.11.0-rc02"
kotlinx-datetime = "0.7.1"
kotlinx-serialization = "1.11.0"
ktlint = "1.7.1"