From 620100b0d335fb2f8d97272c8a9c16babf8c3f00 Mon Sep 17 00:00:00 2001 From: Andre K Date: Tue, 17 Jan 2023 18:46:04 -0300 Subject: [PATCH] feat: add packet transmit queue (#566) --- .../geeksville/mesh/service/MeshService.kt | 83 +++++++++++++++++-- 1 file changed, 78 insertions(+), 5 deletions(-) diff --git a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt index 4bb0eb81a..ce8406d44 100644 --- a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt +++ b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt @@ -42,6 +42,10 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.serialization.json.Json import java.util.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import javax.inject.Inject import kotlin.math.absoluteValue @@ -219,9 +223,8 @@ class MeshService : Service(), Logging { * Send a mesh packet to the radio, if the radio is not currently connected this function will throw NotConnectedException */ private fun sendToRadio(packet: MeshPacket) { - sendToRadio(ToRadio.newBuilder().apply { - this.packet = packet - }) + queuedPackets.add(packet) + startPacketQueue() } private fun updateMessageNotification(message: DataPacket) = @@ -657,6 +660,7 @@ class MeshService : Service(), Logging { val u = MeshProtos.Routing.parseFrom(data.payload) val isAck = u.errorReasonValue == MeshProtos.Routing.Error.NONE_VALUE handleAckNak(isAck, fromId, data.requestId) + queueResponse.remove(data.requestId)?.complete(true) } Portnums.PortNum.ADMIN_APP_VALUE -> { @@ -776,6 +780,63 @@ class MeshService : Service(), Logging { } } + private val queuedPackets = ConcurrentLinkedDeque() + private val queueResponse = mutableMapOf>() + private var queueJob: Job? = null + + private fun sendPacket(packet: MeshPacket): CompletableFuture { + // send the packet to the radio and return a CompletableFuture that will be completed with the result + val future = CompletableFuture() + queueResponse[packet.id] = future + try { + sendToRadio(ToRadio.newBuilder().apply { + this.packet = packet + }) + // FIXME remove when MeshPacketQueue is fixed + if (!packet.wantAck) future.complete(true) + } catch (ex: Exception) { + errormsg("sendToRadio error:", ex) + future.complete(false) + } + return future + } + + private fun startPacketQueue() { + if (queueJob?.isActive == true) return + queueJob = serviceScope.handledLaunch { + debug("packet queueJob started") + while (connectionState == ConnectionState.CONNECTED) { + // take the first packet from the queue head + val packet = queuedPackets.poll() ?: break + // send packet to the radio and wait for response + val response = sendPacket(packet) + try { + debug("queueJob packet id=${packet.id.toUInt()} waiting") + @Suppress("BlockingMethodInNonBlockingContext") + val success = response.get(45, TimeUnit.SECONDS) + debug("queueJob packet id=${packet.id.toUInt()} success $success") + if (!success) { + // if send operation fails, add packet back to queue head and retry + queuedPackets.addFirst(packet) + } + } catch (e: TimeoutException) { + debug("queueJob timeout waiting packet id=${packet.id.toUInt()}") + queuedPackets.addFirst(packet) + } + } + } + } + + private fun stopPacketQueue() { + if (queueJob?.isActive == true) { + debug("Stopping packet queueJob") + queueJob?.cancel() + queueJob = null + queuedPackets.clear() + queueResponse.clear() + } + } + private fun sendNow(p: DataPacket) { val packet = toMeshPacket(p) p.status = MessageStatus.ENROUTE @@ -950,7 +1011,7 @@ class MeshService : Service(), Logging { // Just in case the user uncleanly reboots the phone, save now (we normally save in onDestroy) saveSettings() - // lost radio connection, therefore no need to keep listening to GPS + stopPacketQueue() stopLocationRequests() if (connectTimeMsec != 0L) { @@ -986,7 +1047,7 @@ class MeshService : Service(), Logging { // Just in case the user uncleanly reboots the phone, save now (we normally save in onDestroy) saveSettings() - // lost radio connection, therefore no need to keep listening to GPS + stopPacketQueue() stopLocationRequests() GeeksvilleApplication.analytics.track( @@ -1084,6 +1145,7 @@ class MeshService : Service(), Logging { MeshProtos.FromRadio.CHANNEL_FIELD_NUMBER -> handleChannel(proto.channel) MeshProtos.FromRadio.CONFIG_FIELD_NUMBER -> handleDeviceConfig(proto.config) MeshProtos.FromRadio.MODULECONFIG_FIELD_NUMBER -> handleModuleConfig(proto.moduleConfig) + MeshProtos.FromRadio.QUEUESTATUS_FIELD_NUMBER -> handleQueueStatus(proto.queueStatus) else -> errormsg("Unexpected FromRadio variant") } } catch (ex: InvalidProtocolBufferException) { @@ -1124,6 +1186,17 @@ class MeshService : Service(), Logging { setLocalModuleConfig(config) } + private fun handleQueueStatus(queueStatus: MeshProtos.QueueStatus) { + debug("queueStatus ${queueStatus.toOneLineString()}") + val (success, isFull, requestId) = with(queueStatus) { + // FIXME use "free == 0" when MeshPacketQueue is fixed + Triple(res == 0, free <= 16, meshPacketId) + } + if (success && isFull) return // Queue is full, wait for free != 0 + if (requestId != 0) queueResponse.remove(requestId)?.complete(success) + else queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(success) + } + private fun handleChannel(ch: ChannelProtos.Channel) { debug("Received channel ${ch.index}") val packetToSave = MeshLog(