From e198f52de5ee5dd257e6524f4c159e0afe722a08 Mon Sep 17 00:00:00 2001 From: James Rich <2199651+jamesarich@users.noreply.github.com> Date: Thu, 30 Apr 2026 22:11:22 -0500 Subject: [PATCH] refactor(coroutines): migrate to kotlinx-coroutines 1.11.0-rc02 (#5312) Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../kotlin/org/meshtastic/app/map/MapViewModel.kt | 6 +++--- .../meshtastic/core/data/manager/PacketHandlerImpl.kt | 9 +++++++-- .../meshtastic/core/data/manager/SessionManagerImpl.kt | 5 ++--- .../meshtastic/core/data/manager/XModemManagerImpl.kt | 4 ++-- .../meshtastic/core/network/radio/BleRadioTransport.kt | 6 ++++++ .../core/repository/RadioInterfaceService.kt | 5 ++--- .../meshtastic/core/repository/ServiceRepository.kt | 7 ++++--- .../org/meshtastic/core/repository/SessionManager.kt | 6 +++--- .../meshtastic/core/service/ServiceBroadcastsTest.kt | 4 ++-- .../meshtastic/core/service/ServiceRepositoryImpl.kt | 6 +++--- .../core/service/SharedRadioInterfaceService.kt | 7 +++---- .../org/meshtastic/core/takserver/TAKServerManager.kt | 8 ++++---- .../core/testing/FakeRadioInterfaceService.kt | 6 +++--- .../meshtastic/core/testing/FakeServiceRepository.kt | 4 ++-- .../org/meshtastic/core/ui/viewmodel/UIViewModel.kt | 3 ++- .../core/ui/viewmodel/ViewModelExtensions.kt | 5 +++-- .../kotlin/org/meshtastic/desktop/stub/NoopStubs.kt | 5 +++-- .../feature/firmware/FirmwareUpdateViewModel.kt | 10 ++++++---- .../feature/firmware/ota/dfu/LegacyDfuTransport.kt | 2 +- .../feature/settings/radio/RadioConfigViewModel.kt | 3 ++- gradle/libs.versions.toml | 2 +- 21 files changed, 64 insertions(+), 49 deletions(-) diff --git a/app/src/google/kotlin/org/meshtastic/app/map/MapViewModel.kt b/app/src/google/kotlin/org/meshtastic/app/map/MapViewModel.kt index 49846a178..8a4a798a8 100644 --- a/app/src/google/kotlin/org/meshtastic/app/map/MapViewModel.kt +++ b/app/src/google/kotlin/org/meshtastic/app/map/MapViewModel.kt @@ -33,11 +33,11 @@ import io.ktor.client.request.get import io.ktor.client.statement.bodyAsChannel import io.ktor.http.isSuccess import io.ktor.utils.io.jvm.javaio.toInputStream +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.mapNotNull @@ -135,7 +135,7 @@ class MapViewModel( val theme: StateFlow = uiPrefs.theme private val _errorFlow = MutableSharedFlow() - val errorFlow: SharedFlow = _errorFlow.asSharedFlow() + val errorFlow: Flow = _errorFlow.asFlow() val customTileProviderConfigs: StateFlow> = customTileProviderRepository.getCustomTileProviders().stateInWhileSubscribed(initialValue = emptyList()) diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/PacketHandlerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/PacketHandlerImpl.kt index 2016d4da7..aa62b76b9 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/PacketHandlerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/PacketHandlerImpl.kt @@ -20,8 +20,10 @@ import co.touchlab.kermit.Logger import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred import kotlinx.coroutines.Job import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.asDeferred import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.consumeAsFlow @@ -260,7 +262,7 @@ class PacketHandlerImpl( } @Suppress("TooGenericExceptionCaught") - private suspend fun sendPacket(packet: MeshPacket): CompletableDeferred { + private suspend fun sendPacket(packet: MeshPacket): Deferred { // Reuse a deferred pre-registered by sendToRadioAndAwait, or create a new one. val deferred = responseMutex.withLock { queueResponse.getOrPut(packet.id) { CompletableDeferred() } } try { @@ -275,7 +277,10 @@ class PacketHandlerImpl( Logger.e(ex) { "sendToRadio error: ${ex.message}" } deferred.complete(false) } - return deferred + // Return a read-only Deferred view (kotlinx.coroutines 1.11+) so callers can await it + // without being able to complete the underlying CompletableDeferred; cancellation is + // still exposed via Deferred/Job. + return deferred.asDeferred() } private fun insertMeshLog(packetToSave: MeshLog) { diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/SessionManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/SessionManagerImpl.kt index 892b953d4..9a9c290f5 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/SessionManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/SessionManagerImpl.kt @@ -24,8 +24,7 @@ import kotlinx.collections.immutable.persistentMapOf import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.flow @@ -59,7 +58,7 @@ class SessionManagerImpl(private val clock: Clock) : SessionManager { private val entries = atomic>(persistentMapOf()) private val refreshFlow = MutableSharedFlow(extraBufferCapacity = REFRESH_BUFFER) - override val sessionRefreshFlow: SharedFlow = refreshFlow.asSharedFlow() + override val sessionRefreshFlow: Flow = refreshFlow.asFlow() override fun recordSession(srcNodeNum: Int, passkey: ByteString) { if (passkey.size == 0) return diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/XModemManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/XModemManagerImpl.kt index 42d4f3ae1..5858d19ba 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/XModemManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/XModemManagerImpl.kt @@ -19,7 +19,7 @@ package org.meshtastic.core.data.manager import co.touchlab.kermit.Logger import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.asFlow import org.koin.core.annotation.Single import org.meshtastic.core.common.util.nowMillis import org.meshtastic.core.repository.PacketHandler @@ -48,7 +48,7 @@ class XModemManagerImpl(private val packetHandler: PacketHandler) : XModemManage extraBufferCapacity = 4, onBufferOverflow = BufferOverflow.DROP_OLDEST, ) - override val fileTransferFlow = _fileTransferFlow.asSharedFlow() + override val fileTransferFlow = _fileTransferFlow.asFlow() // --- mutable state --- // Thread-safety contract: [handleIncomingXModem] is called sequentially from diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioTransport.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioTransport.kt index 1d4161557..3a96481b1 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioTransport.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioTransport.kt @@ -182,6 +182,8 @@ class BleRadioTransport( } } if (d != null) return d + } catch (e: CancellationException) { + throw e } catch (e: Exception) { Logger.v(e) { "[$address] Scan attempt failed or timed out" } } @@ -243,6 +245,8 @@ class BleRadioTransport( try { bluetoothRepository.bond(device) Logger.i { "[$address] Bonding successful" } + } catch (e: CancellationException) { + throw e } catch (e: Exception) { Logger.w(e) { "[$address] Bonding failed, attempting connection anyway" } } @@ -301,6 +305,8 @@ class BleRadioTransport( val rssi = retryBleOperation(tag = address) { device.readRssi() } Logger.d { "[$address] Connection confirmed. Initial RSSI: $rssi dBm" } } + } catch (e: CancellationException) { + throw e } catch (e: Exception) { Logger.w(e) { "[$address] Failed to read initial connection RSSI" } } diff --git a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioInterfaceService.kt b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioInterfaceService.kt index 3fe67d785..51d855494 100644 --- a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioInterfaceService.kt +++ b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioInterfaceService.kt @@ -18,7 +18,6 @@ package org.meshtastic.core.repository import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.DeviceType @@ -79,7 +78,7 @@ interface RadioInterfaceService : RadioTransportCallback { val receivedData: Flow /** Flow of radio activity events. */ - val meshActivity: SharedFlow + val meshActivity: Flow /** * Drains any bytes currently buffered in [receivedData] without emitting them to collectors. @@ -112,7 +111,7 @@ interface RadioInterfaceService : RadioTransportCallback { fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String /** Flow of user-facing connection error messages (e.g. permission failures). */ - val connectionError: SharedFlow + val connectionError: Flow /** The scope in which interface-related coroutines should run. */ val serviceScope: CoroutineScope diff --git a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/ServiceRepository.kt b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/ServiceRepository.kt index 03ad57306..2a09e95c8 100644 --- a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/ServiceRepository.kt +++ b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/ServiceRepository.kt @@ -18,7 +18,6 @@ package org.meshtastic.core.repository import co.touchlab.kermit.Severity import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.service.ServiceAction @@ -121,9 +120,11 @@ interface ServiceRepository { /** * Flow of all raw [MeshPacket] objects received from the mesh. * - * Subscribing to this flow allows components to react to any incoming traffic. + * Subscribing to this flow allows components to react to any incoming traffic. The underlying implementation may be + * backed by a hot shared flow, but this API intentionally exposes only the [Flow] interface. That implementation + * detail is hidden via [kotlinx.coroutines.flow.SharedFlow.asFlow] (kotlinx.coroutines 1.11+). */ - val meshPacketFlow: SharedFlow + val meshPacketFlow: Flow /** * Emits a mesh packet into the flow. diff --git a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/SessionManager.kt b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/SessionManager.kt index 077db0434..dad094c75 100644 --- a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/SessionManager.kt +++ b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/SessionManager.kt @@ -17,7 +17,6 @@ package org.meshtastic.core.repository import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.SharedFlow import okio.ByteString import org.meshtastic.core.model.SessionStatus @@ -49,9 +48,10 @@ interface SessionManager { * Used by `EnsureRemoteAdminSessionUseCase` to await a session refresh from a specific node without polling. * * Backed by a `MutableSharedFlow` with no replay; subscribers must subscribe **before** dispatching the request - * that triggers the refresh. + * that triggers the refresh. Exposed as a [Flow] via [kotlinx.coroutines.flow.SharedFlow.asFlow] + * (kotlinx.coroutines 1.11+) to hide the `SharedFlow` API from consumers while preserving hot-stream semantics. */ - val sessionRefreshFlow: SharedFlow + val sessionRefreshFlow: Flow /** * Cold per-node [SessionStatus] flow. Emits the current status synchronously on subscription and re-emits whenever diff --git a/core/service/src/androidHostTest/kotlin/org/meshtastic/core/service/ServiceBroadcastsTest.kt b/core/service/src/androidHostTest/kotlin/org/meshtastic/core/service/ServiceBroadcastsTest.kt index 46f28486f..16a9a000c 100644 --- a/core/service/src/androidHostTest/kotlin/org/meshtastic/core/service/ServiceBroadcastsTest.kt +++ b/core/service/src/androidHostTest/kotlin/org/meshtastic/core/service/ServiceBroadcastsTest.kt @@ -23,7 +23,7 @@ import co.touchlab.kermit.Severity import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asFlow import org.junit.Before import org.junit.Test import org.junit.runner.RunWith @@ -78,7 +78,7 @@ class ServiceBroadcastsTest { override val errorMessage = MutableStateFlow(null) override val connectionProgress = MutableStateFlow(null) private val meshPackets = MutableSharedFlow() - override val meshPacketFlow: SharedFlow = meshPackets + override val meshPacketFlow: Flow = meshPackets.asFlow() override val tracerouteResponse = MutableStateFlow(null) override val neighborInfoResponse = MutableStateFlow(null) private val serviceActions = MutableSharedFlow() diff --git a/core/service/src/commonMain/kotlin/org/meshtastic/core/service/ServiceRepositoryImpl.kt b/core/service/src/commonMain/kotlin/org/meshtastic/core/service/ServiceRepositoryImpl.kt index 3a3c5f30c..5ad5c2d00 100644 --- a/core/service/src/commonMain/kotlin/org/meshtastic/core/service/ServiceRepositoryImpl.kt +++ b/core/service/src/commonMain/kotlin/org/meshtastic/core/service/ServiceRepositoryImpl.kt @@ -22,8 +22,8 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.receiveAsFlow import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.service.ServiceAction @@ -88,8 +88,8 @@ open class ServiceRepositoryImpl : ServiceRepository { } private val _meshPacketFlow = MutableSharedFlow(extraBufferCapacity = 64) - override val meshPacketFlow: SharedFlow - get() = _meshPacketFlow + override val meshPacketFlow: Flow + get() = _meshPacketFlow.asFlow() override suspend fun emitMeshPacket(packet: MeshPacket) { _meshPacketFlow.emit(packet) diff --git a/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt b/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt index 67db67d07..68a3573e9 100644 --- a/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt +++ b/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt @@ -30,9 +30,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.launchIn @@ -109,10 +108,10 @@ class SharedRadioInterfaceService( private val _meshActivity = MutableSharedFlow(extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST) - override val meshActivity: SharedFlow = _meshActivity.asSharedFlow() + override val meshActivity: Flow = _meshActivity.asFlow() private val _connectionError = MutableSharedFlow(extraBufferCapacity = 64) - override val connectionError: SharedFlow = _connectionError.asSharedFlow() + override val connectionError: Flow = _connectionError.asFlow() override val serviceScope: CoroutineScope get() = _serviceScope diff --git a/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKServerManager.kt b/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKServerManager.kt index 0a47321d6..cc6861d17 100644 --- a/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKServerManager.kt +++ b/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKServerManager.kt @@ -20,11 +20,11 @@ import co.touchlab.kermit.Logger import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.launch @@ -35,7 +35,7 @@ import org.meshtastic.core.model.Node interface TAKServerManager { val isRunning: StateFlow val connectionCount: StateFlow - val inboundMessages: SharedFlow + val inboundMessages: Flow /** Start the TAK server using [scope]. Port is fixed at [TAKServer] construction time. */ fun start(scope: CoroutineScope) @@ -59,7 +59,7 @@ class TAKServerManagerImpl(private val takServer: TAKServer) : TAKServerManager override val connectionCount: StateFlow = takServer.connectionCount private val _inboundMessages = MutableSharedFlow() - override val inboundMessages: SharedFlow = _inboundMessages.asSharedFlow() + override val inboundMessages: Flow = _inboundMessages.asFlow() // Unbounded channel preserves FIFO ordering of inbound CoT messages under load. // onMessage is a non-suspend callback, so we trySend (always succeeds for UNLIMITED) diff --git a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioInterfaceService.kt b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioInterfaceService.kt index cd31a2e97..f7837e436 100644 --- a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioInterfaceService.kt +++ b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioInterfaceService.kt @@ -22,8 +22,8 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.receiveAsFlow import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.DeviceType @@ -57,10 +57,10 @@ class FakeRadioInterfaceService(override val serviceScope: CoroutineScope = Main override val receivedData: Flow = _receivedData.receiveAsFlow() private val _meshActivity = MutableSharedFlow() - override val meshActivity: SharedFlow = _meshActivity + override val meshActivity: Flow = _meshActivity.asFlow() private val _connectionError = MutableSharedFlow() - override val connectionError: SharedFlow = _connectionError + override val connectionError: Flow = _connectionError.asFlow() val sentToRadio = mutableListOf() var connectCalled = false diff --git a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeServiceRepository.kt b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeServiceRepository.kt index ae06843b6..494586e08 100644 --- a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeServiceRepository.kt +++ b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeServiceRepository.kt @@ -20,8 +20,8 @@ import co.touchlab.kermit.Severity import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asFlow import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.service.ServiceAction import org.meshtastic.core.model.service.TracerouteResponse @@ -69,7 +69,7 @@ class FakeServiceRepository : ServiceRepository { } private val _meshPacketFlow = MutableSharedFlow() - override val meshPacketFlow: SharedFlow = _meshPacketFlow + override val meshPacketFlow: Flow = _meshPacketFlow.asFlow() override suspend fun emitMeshPacket(packet: MeshPacket) { _meshPacketFlow.emit(packet) diff --git a/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/UIViewModel.kt b/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/UIViewModel.kt index f235a4919..4f1b32b4c 100644 --- a/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/UIViewModel.kt +++ b/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/UIViewModel.kt @@ -25,6 +25,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.filterNotNull @@ -133,7 +134,7 @@ class UIViewModel( private val _scrollToTopEventFlow = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) - val scrollToTopEventFlow: Flow = _scrollToTopEventFlow.asSharedFlow() + val scrollToTopEventFlow: Flow = _scrollToTopEventFlow.asFlow() fun emitScrollToTopEvent(event: ScrollToTopEvent) { _scrollToTopEventFlow.tryEmit(event) diff --git a/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/ViewModelExtensions.kt b/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/ViewModelExtensions.kt index 747c35786..b27e7391d 100644 --- a/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/ViewModelExtensions.kt +++ b/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/ViewModelExtensions.kt @@ -129,7 +129,8 @@ fun safeLaunch( } /** - * Creates and returns a [MutableSharedFlow] intended for one-shot error events. Expose as `SharedFlow` via - * [asSharedFlow] in the ViewModel, and collect in the UI to show snackbars or toasts. + * Creates and returns a [MutableSharedFlow] intended for one-shot error events. Expose as `Flow` via + * [kotlinx.coroutines.flow.asFlow] in the ViewModel (hiding hot-flow semantics), and collect in the UI to show + * snackbars or toasts. */ fun errorEventFlow(): MutableSharedFlow = MutableSharedFlow(extraBufferCapacity = 1) diff --git a/desktop/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt b/desktop/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt index ccfe10d1e..081735e25 100644 --- a/desktop/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt +++ b/desktop/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt @@ -25,6 +25,7 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.emptyFlow import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.DataPacket @@ -74,8 +75,8 @@ class NoopRadioInterfaceService : RadioInterfaceService { override fun isMockTransport(): Boolean = false override val receivedData = MutableSharedFlow() - override val meshActivity = MutableSharedFlow() - override val connectionError = MutableSharedFlow() + override val meshActivity: Flow = MutableSharedFlow().asFlow() + override val connectionError: Flow = MutableSharedFlow().asFlow() override fun sendToRadio(bytes: ByteArray) { logWarn("NoopRadioInterfaceService.sendToRadio(${bytes.size} bytes)") diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/FirmwareUpdateViewModel.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/FirmwareUpdateViewModel.kt index fedc95cd0..7c0c7f272 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/FirmwareUpdateViewModel.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/FirmwareUpdateViewModel.kt @@ -20,6 +20,7 @@ import androidx.lifecycle.ViewModel import androidx.lifecycle.viewModelScope import co.touchlab.kermit.Logger import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Job import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.delay @@ -32,6 +33,7 @@ import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull import org.jetbrains.compose.resources.StringResource import org.koin.core.annotation.KoinViewModel @@ -127,10 +129,10 @@ class FirmwareUpdateViewModel( override fun onCleared() { super.onCleared() // viewModelScope is already cancelled when onCleared() runs, so launch cleanup on the - // application-wide scope (SupervisorJob + ioDispatcher). NonCancellable keeps cleanup - // running even if something tries to cancel it mid-flight. - applicationScope.launch(NonCancellable) { - tempFirmwareFile = cleanupTemporaryFiles(fileHandler, tempFirmwareFile) + // application-wide scope (SupervisorJob + ioDispatcher). ATOMIC start + NonCancellable + // context keeps cleanup running even if something tries to cancel it mid-flight. + applicationScope.launch(start = CoroutineStart.ATOMIC) { + withContext(NonCancellable) { tempFirmwareFile = cleanupTemporaryFiles(fileHandler, tempFirmwareFile) } } } diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransport.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransport.kt index 7df0b9916..e54fb157c 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransport.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransport.kt @@ -142,7 +142,7 @@ class LegacyDfuTransport( // Best-effort DFU Version read — gate out unsupported old bootloaders (SDK ≤ 6). val versionChar = service.characteristic(LEGACY_DFU_VERSION_UUID) val version = - runCatching { service.read(versionChar) } + safeCatching { service.read(versionChar) } .map { bytes -> if (bytes.size >= 2) (bytes[0].toInt() and 0xFF) or ((bytes[1].toInt() and 0xFF) shl 8) else -1 } diff --git a/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModel.kt b/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModel.kt index 67e671720..fd923a133 100644 --- a/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModel.kt +++ b/feature/settings/src/commonMain/kotlin/org/meshtastic/feature/settings/radio/RadioConfigViewModel.kt @@ -35,6 +35,7 @@ import org.jetbrains.compose.resources.StringResource import org.koin.core.annotation.InjectedParam import org.koin.core.annotation.KoinViewModel import org.meshtastic.core.common.util.CommonUri +import org.meshtastic.core.common.util.safeCatching import org.meshtastic.core.domain.usecase.settings.AdminActionsUseCase import org.meshtastic.core.domain.usecase.settings.ExportProfileUseCase import org.meshtastic.core.domain.usecase.settings.ExportSecurityConfigUseCase @@ -165,7 +166,7 @@ open class RadioConfigViewModel( probeJob = viewModelScope.launch { val result = - runCatching { mqttManager.probe(address, tlsEnabled, username, password) } + safeCatching { mqttManager.probe(address, tlsEnabled, username, password) } .getOrElse { e -> Logger.w(e) { "MQTT probe threw" } MqttProbeStatus.Other(message = e.message) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3d0ddefe6..58a97430b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -19,7 +19,7 @@ koin-plugin = "1.0.0-RC2" # Kotlin kotlin = "2.3.21" -kotlinx-coroutines-android = "1.10.2" +kotlinx-coroutines-android = "1.11.0-rc02" kotlinx-datetime = "0.7.1" kotlinx-serialization = "1.11.0" ktlint = "1.7.1"