diff --git a/bin/dmshell_client.py b/bin/dmshell_client.py index 90e4270e7..b5106fc63 100644 --- a/bin/dmshell_client.py +++ b/bin/dmshell_client.py @@ -33,6 +33,17 @@ HEARTBEAT_IDLE_DELAY_SEC = 5.0 HEARTBEAT_REPEAT_SEC = 15.0 HEARTBEAT_POLL_INTERVAL_SEC = 0.25 +# --- Half-duplex turn-taking ("talking stick"), carried in RemoteShell.flags --- +# On a 2-party LoRa link, Meshtastic's CSMA-CA collapses when both ends transmit at once +# (synchronized same-slot collisions that CAD can't prevent). These flags let exactly one side +# transmit at a time. The client is the master / idle-owner: it holds the token silently when +# idle and grants it to the server whenever the server may need to respond. +FLAG_GRANT = 0x01 # I am handing you the turn; you may transmit now +FLAG_MORE = 0x02 # I yielded under a budget but still have data queued (grant back promptly) +FLAG_RTS = 0x04 # (server->client) I have output but no turn; please grant me one +CLIENT_TURN_BUDGET = 4 # max frames the client sends per turn before yielding +TURN_RECLAIM_SEC = 8.0 # reclaim the token if a grant goes unanswered this long (anti-deadlock) + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( @@ -62,6 +73,19 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--close-after", type=float, default=2.0, help="seconds to wait before closing in command mode") parser.add_argument("--timeout", type=float, default=10.0, help="seconds to wait for API/session events") parser.add_argument("--verbose", action="store_true", help="print extra protocol events") + parser.add_argument( + "--no-turn-taking", + dest="turn_taking", + action="store_false", + help="disable half-duplex turn-taking (revert to free-running send; needed for old peers)", + ) + parser.add_argument( + "--turn-budget", type=int, default=CLIENT_TURN_BUDGET, help="max frames to send per turn before yielding" + ) + parser.add_argument( + "--turn-reclaim", type=float, default=TURN_RECLAIM_SEC, help="seconds before reclaiming an unanswered turn grant" + ) + parser.set_defaults(turn_taking=True) return parser.parse_args() @@ -281,6 +305,24 @@ class SentShellFrame: last_rx_seq: int = 0 +@dataclass +class TxIntent: + """A frame queued for transmission; the sender thread allocates seq and adds turn flags.""" + + op: int + payload: bytes = b"" + cols: int = 0 + rows: int = 0 + session_id: Optional[int] = None + ack_seq: Optional[int] = None + seq: Optional[int] = None + flags: int = 0 + last_tx_seq: int = 0 + last_rx_seq: int = 0 + remember: bool = True + heartbeat: bool = False + + @dataclass class SessionState: pb2: object # ProtoModules with mesh and portnums attributes @@ -310,6 +352,38 @@ class SessionState: last_transport_activity_time: float = field(default_factory=time.monotonic) last_heartbeat_sent_time: float = 0.0 + # --- Half-duplex turn-taking state --- + turn_taking: bool = True + turn_budget: int = CLIENT_TURN_BUDGET + turn_reclaim_sec: float = TURN_RECLAIM_SEC + has_token: bool = True # client is the master / idle-owner: starts holding the turn + grant_requested: bool = False # peer asked for a turn (FLAG_MORE / FLAG_RTS) + last_grant_time: float = 0.0 # when we last handed the turn to the peer + last_inbound_time: float = 0.0 # when we last heard anything (to detect an unanswered grant) + tx_intents: deque = field(default_factory=deque) + turn_cv: threading.Condition = field(default_factory=threading.Condition) + + def enqueue_tx(self, intent: TxIntent) -> None: + with self.turn_cv: + self.tx_intents.append(intent) + self.turn_cv.notify_all() + + def note_turn_flags(self, flags: int) -> None: + """Apply channel-access flags from an inbound frame (called by the reader thread).""" + if not self.turn_taking: + return + with self.turn_cv: + self.last_inbound_time = time.monotonic() + if flags & FLAG_GRANT: + self.has_token = True + if flags & (FLAG_MORE | FLAG_RTS): + self.grant_requested = True + self.turn_cv.notify_all() + + def wake_sender(self) -> None: + with self.turn_cv: + self.turn_cv.notify_all() + def alloc_seq(self) -> int: with self.tx_lock: value = self.next_seq @@ -510,10 +584,40 @@ def send_shell_frame( remember: bool = True, heartbeat: bool = False, ) -> int: + """Queue a frame for transmission. The sender thread allocates seq, adds turn flags, and sends. + + (transport is unused here — the sender thread owns the transport — but kept for call-site + compatibility.) + """ + state.enqueue_tx( + TxIntent( + op=op, + payload=payload, + cols=cols, + rows=rows, + session_id=session_id, + ack_seq=ack_seq, + seq=seq, + flags=flags, + last_tx_seq=last_tx_seq, + last_rx_seq=last_rx_seq, + remember=remember, + heartbeat=heartbeat, + ) + ) + return seq if seq is not None else 0 + + +def transmit_frame(transport, state: SessionState, intent: TxIntent) -> int: + """Actually serialize and send one frame over the API socket. Only the sender thread calls this.""" + op = intent.op + seq = intent.seq if seq is None: seq = 0 if op == state.pb2.mesh.RemoteShell.ACK else state.alloc_seq() + ack_seq = intent.ack_seq if ack_seq is None: ack_seq = state.current_ack_seq() + session_id = intent.session_id if session_id is None: session_id = state.session_id @@ -522,34 +626,117 @@ def send_shell_frame( shell.session_id = session_id shell.seq = seq shell.ack_seq = ack_seq - shell.cols = cols - shell.rows = rows - shell.flags = flags - shell.last_tx_seq = last_tx_seq - shell.last_rx_seq = last_rx_seq - if payload: - shell.payload = payload + shell.cols = intent.cols + shell.rows = intent.rows + shell.flags = intent.flags + shell.last_tx_seq = intent.last_tx_seq + shell.last_rx_seq = intent.last_rx_seq + if intent.payload: + shell.payload = intent.payload with state.socket_lock: send_toradio(transport, make_toradio_packet(state.pb2, state, shell)) - if remember: + if intent.remember: state.remember_sent_frame( SentShellFrame( op=op, session_id=session_id, seq=seq, ack_seq=ack_seq, - payload=payload, - cols=cols, - rows=rows, - flags=flags, - last_tx_seq=last_tx_seq, - last_rx_seq=last_rx_seq, + payload=intent.payload, + cols=intent.cols, + rows=intent.rows, + flags=intent.flags, + last_tx_seq=intent.last_tx_seq, + last_rx_seq=intent.last_rx_seq, ) ) - state.note_outbound_packet(heartbeat=heartbeat) + state.note_outbound_packet(heartbeat=intent.heartbeat) return seq +def sender_loop(transport, state: SessionState) -> None: + """Single transmitter. Sends queued frames only while we hold the turn, then grants it away. + + This is what makes the link half-duplex: with turn-taking on, only one side transmits at a + time, eliminating the synchronized collisions that wreck the CSMA-CA scheme on a 2-party link. + """ + while not state.stopped: + burst: list[TxIntent] = [] + send_bare_grant = False + more_after = False + + with state.turn_cv: + while not state.stopped: + if not state.turn_taking: + if state.tx_intents: + break + state.turn_cv.wait(timeout=1.0) + continue + if state.has_token: + if state.tx_intents or state.grant_requested: + break + # Hold the token and stay silent until we have work or the peer requests a turn. + state.turn_cv.wait(timeout=1.0) + else: + # We granted the turn away. Reclaim it only if we granted a while ago AND the + # link has gone quiet (no inbound) for that long — i.e. the peer isn't mid-turn, + # so either our grant or the peer's yield-grant was lost. Requiring quiet (not + # just grant age) avoids reclaiming mid-stream, which would cause double-talk. + now = time.monotonic() + if ( + state.last_grant_time + and (now - state.last_grant_time) > state.turn_reclaim_sec + and (now - state.last_inbound_time) > state.turn_reclaim_sec + ): + state.has_token = True + if state.verbose: + state.event_queue.put("turn: reclaimed unanswered grant") + continue + ref = max(state.last_grant_time, state.last_inbound_time) + timeout = max(0.05, state.turn_reclaim_sec - (now - ref)) if ref else state.turn_reclaim_sec + state.turn_cv.wait(timeout=timeout) + + if state.stopped: + return + + # We hold the turn (or turn-taking is off). Pull a burst of intents to send. + budget = None if not state.turn_taking else max(1, state.turn_budget) + while state.tx_intents and (budget is None or len(burst) < budget): + burst.append(state.tx_intents.popleft()) + + if state.turn_taking: + more_after = bool(state.tx_intents) # still queued beyond our budget + if burst: + # The burst's last frame grants the turn, satisfying any pending request. + state.grant_requested = False + state.has_token = False + state.last_grant_time = time.monotonic() + elif state.grant_requested: + send_bare_grant = True + state.grant_requested = False + state.has_token = False + state.last_grant_time = time.monotonic() + + # Transmit outside the lock (socket I/O may block). + try: + if burst: + n = len(burst) + for i, intent in enumerate(burst): + if state.turn_taking and i == n - 1: + # Last frame of our turn: hand the token back to the peer. + intent.flags |= FLAG_GRANT + if more_after: + intent.flags |= FLAG_MORE + transmit_frame(transport, state, intent) + elif send_bare_grant: + transmit_frame(transport, state, TxIntent(op=state.pb2.mesh.RemoteShell.ACK, seq=0, remember=False, flags=FLAG_GRANT)) + except Exception as exc: + if not state.stopped: + state.event_queue.put(f"sender error: {exc}") + state.closed_event.set() + return + + def send_ack_frame(transport, state: SessionState, replay_from: Optional[int] = None) -> None: send_shell_frame( transport, @@ -676,6 +863,9 @@ def reader_loop(transport, state: SessionState) -> None: if not shell: continue state.note_inbound_packet() + # Honor channel-access flags (GRANT/MORE/RTS) on every inbound frame, regardless of + # payload ordering, so a granted turn is never lost to a gap. + state.note_turn_flags(shell.flags) #state.prune_sent_frames(shell.ack_seq) if shell.op == state.pb2.mesh.RemoteShell.ACK: #state.event_queue.put("peer requested replay") @@ -902,6 +1092,9 @@ def main() -> int: target=parse_node_num(args.to), channel=args.channel, verbose=args.verbose, + turn_taking=args.turn_taking, + turn_budget=args.turn_budget, + turn_reclaim_sec=args.turn_reclaim, ) cols, rows = resolve_initial_terminal_size(args.cols, args.rows) @@ -913,6 +1106,11 @@ def main() -> int: reader = threading.Thread(target=reader_loop, args=(transport, state), daemon=True) reader.start() + # The sender thread owns all transmission and the turn-taking token. Start it before OPEN + # so the queued OPEN goes out (with a GRANT so the server can reply OPEN_OK). + sender = threading.Thread(target=sender_loop, args=(transport, state), daemon=True) + sender.start() + send_shell_frame(transport, state, pb2.mesh.RemoteShell.OPEN, cols=cols, rows=rows) if not state.opened_event.wait(timeout=args.timeout): raise SystemExit("timed out waiting for OPEN_OK from remote DMShell") @@ -927,9 +1125,11 @@ def main() -> int: run_interactive_mode(transport, state) state.stopped = True + state.wake_sender() drain_events(state) reader.join(timeout=1.0) heartbeat.join(timeout=1.0) + sender.join(timeout=1.0) state.close_replay_log() finally: transport.close() diff --git a/src/modules/DMShell.cpp b/src/modules/DMShell.cpp index f2cfe1e27..b33da1322 100644 --- a/src/modules/DMShell.cpp +++ b/src/modules/DMShell.cpp @@ -13,6 +13,7 @@ #include "pb_encode.h" #include #include +#include #include #include #include @@ -29,6 +30,16 @@ namespace constexpr uint16_t PTY_COLS_DEFAULT = 120; constexpr uint16_t PTY_ROWS_DEFAULT = 40; constexpr size_t MAX_MESSAGE_SIZE = 200; + +// --- Half-duplex turn-taking ("talking stick") protocol, carried in RemoteShell.flags --- +// On a 2-party LoRa link, Meshtastic's CSMA-CA breaks down when both ends transmit at once +// (synchronized same-slot collisions that CAD can't prevent). These flags let exactly one +// side transmit at a time, eliminating those collisions. The client is the master/idle-owner. +constexpr uint32_t TURN_FLAG_GRANT = 0x01; // I am handing you the turn; you may transmit now +constexpr uint32_t TURN_FLAG_MORE = 0x02; // I yielded under a budget but still have data queued +constexpr uint32_t TURN_FLAG_RTS = 0x04; // I have output but no turn; please grant me one +constexpr size_t TURN_BUDGET_FRAMES = 4; // max output frames per granted turn before yielding +constexpr uint32_t RTS_RETRY_MS = 1000; // min interval between request-to-send frames } // namespace DMShellModule::DMShellModule() @@ -51,8 +62,14 @@ ProcessMessage DMShellModule::handleReceived(const meshtastic_MeshPacket &mp) } if (frame.op == meshtastic_RemoteShell_OpCode_ACK) { - if (session.active && frame.session_id == session.sessionId && getFrom(&mp) == session.peer && frame.last_rx_seq > 0) { - resendFramesFrom(frame.last_rx_seq + 1); + if (session.active && frame.session_id == session.sessionId && getFrom(&mp) == session.peer) { + applyTurnFlags(frame); + if (frame.last_rx_seq > 0) { + resendFramesFrom(frame.last_rx_seq + 1); + } + // A standalone grant (client re-granting for MORE, replying to our RTS, or a heartbeat + // poll) is our cue to flush any pending shell output during this turn. + serviceTurn(); } return ProcessMessage::CONTINUE; } @@ -88,7 +105,14 @@ ProcessMessage DMShellModule::handleReceived(const meshtastic_MeshPacket &mp) return ProcessMessage::STOP; } + // Honor channel-access flags before ordering checks: a GRANT transfers the turn regardless of + // whether this frame's payload is in order. + applyTurnFlags(frame); + if (!shouldProcessIncomingFrame(frame)) { + // We won't process the payload (gap/duplicate), but we may now hold the turn, so flush + // output and/or hand it back rather than stalling the link. + serviceTurn(); return ProcessMessage::STOP; } @@ -98,7 +122,9 @@ ProcessMessage DMShellModule::handleReceived(const meshtastic_MeshPacket &mp) case meshtastic_RemoteShell_OpCode_INPUT: if (!writeSessionInput(frame)) { sendError("input_write_failed"); - } else { + } else if (!session.turnManaged) { + // Legacy peer (no turn-taking): echo immediately as before. In managed mode the + // serviceTurn() call at the end of handleReceived drains the echo and yields the turn. uint8_t outBuf[MAX_MESSAGE_SIZE]; const ssize_t bytesRead = read(session.masterFd, outBuf, sizeof(outBuf)); if (bytesRead > 0) { @@ -164,6 +190,9 @@ ProcessMessage DMShellModule::handleReceived(const meshtastic_MeshPacket &mp) break; } + // If the peer granted us the turn, flush pending shell output and hand the turn back. + serviceTurn(); + return ProcessMessage::STOP; } @@ -189,6 +218,19 @@ int32_t DMShellModule::runOnce() return 50; } + if (session.turnManaged) { + if (session.hasToken) { + // We hold the turn: flush output and hand it back. + serviceTurn(); + } else if (ptyHasOutput() && !Throttle::isWithinTimespanMs(session.lastRtsMs, RTS_RETRY_MS)) { + // Unsolicited shell output but no turn: ask the client to grant us one. + sendRts(); + session.lastRtsMs = millis(); + } + return 50; + } + + // Legacy free-send path (peer is not using turn-taking). uint8_t outBuf[MAX_MESSAGE_SIZE]; while (session.masterFd >= 0) { const ssize_t bytesRead = read(session.masterFd, outBuf, sizeof(outBuf)); @@ -321,6 +363,12 @@ bool DMShellModule::openSession(const meshtastic_MeshPacket &mp, const meshtasti session.nextExpectedRxSeq = frame.seq + 1; session.highestSeenRxSeq = frame.seq; session.lastActivityMs = millis(); + session.turnManaged = false; + session.hasToken = false; + session.lastRtsMs = 0; + + // Honor any GRANT the client put on OPEN (opts this session into turn-taking). + applyTurnFlags(frame); meshtastic_RemoteShell newFrame = { .op = meshtastic_RemoteShell_OpCode_OPEN_OK, @@ -329,10 +377,14 @@ bool DMShellModule::openSession(const meshtastic_MeshPacket &mp, const meshtasti .ack_seq = frame.seq, .cols = ws.ws_col, .rows = ws.ws_row, - .flags = 0, + .flags = session.turnManaged ? TURN_FLAG_GRANT : 0u, }; newFrame.payload.size = 0; sendFrameToPeer(session.peer, newFrame, true); + if (session.turnManaged) { + // OPEN_OK handed the turn back to the client; it is now the idle-owner. + session.hasToken = false; + } LOG_INFO("DMShell: opened session=0x%x peer=0x%x pid=%d", session.sessionId, session.peer, session.childPid); return true; @@ -579,6 +631,130 @@ void DMShellModule::sendFrameToPeer(NodeNum peer, meshtastic_RemoteShell frame, service->sendToMesh(packet); } +void DMShellModule::applyTurnFlags(const meshtastic_RemoteShell &frame) +{ + if (frame.flags & (TURN_FLAG_GRANT | TURN_FLAG_MORE | TURN_FLAG_RTS)) { + session.turnManaged = true; // peer speaks turn-taking; enable gating for this session + } + if (frame.flags & TURN_FLAG_GRANT) { + session.hasToken = true; + } +} + +bool DMShellModule::ptyHasOutput() +{ + if (session.masterFd < 0) { + return false; + } + struct pollfd pfd = {}; + pfd.fd = session.masterFd; + pfd.events = POLLIN; + return poll(&pfd, 1, 0) > 0 && (pfd.revents & POLLIN); +} + +void DMShellModule::sendOutputFrame(const uint8_t *data, size_t len, uint32_t extraFlags) +{ + meshtastic_RemoteShell frame = { + .op = meshtastic_RemoteShell_OpCode_OUTPUT, + .session_id = session.sessionId, + .seq = session.nextTxSeq++, + .ack_seq = session.lastAckedRxSeq, + .cols = 0, + .rows = 0, + .flags = extraFlags, + }; + assert(len <= sizeof(frame.payload.bytes)); + memcpy(frame.payload.bytes, data, len); + frame.payload.size = len; + sendFrameToPeer(session.peer, frame, true); +} + +void DMShellModule::sendTurnGrant(bool more) +{ + meshtastic_RemoteShell frame = { + .op = meshtastic_RemoteShell_OpCode_ACK, + .session_id = session.sessionId, + .seq = 0, + .ack_seq = session.lastAckedRxSeq, + .cols = 0, + .rows = 0, + .flags = TURN_FLAG_GRANT | (more ? TURN_FLAG_MORE : 0u), + .last_rx_seq = 0, + }; + frame.payload.size = 0; + sendFrameToPeer(session.peer, frame, false); +} + +void DMShellModule::sendRts() +{ + meshtastic_RemoteShell frame = { + .op = meshtastic_RemoteShell_OpCode_ACK, + .session_id = session.sessionId, + .seq = 0, + .ack_seq = session.lastAckedRxSeq, + .cols = 0, + .rows = 0, + .flags = TURN_FLAG_RTS, + .last_rx_seq = 0, + }; + frame.payload.size = 0; + sendFrameToPeer(session.peer, frame, false); +} + +// Called when we hold the turn: send up to TURN_BUDGET_FRAMES chunks of shell output, then hand +// the turn back to the client. The grant is piggybacked on the last output frame (or sent as a +// standalone ACK if there was nothing to send), so the token always settles back at the client. +void DMShellModule::serviceTurn() +{ + if (!session.active || !session.turnManaged || !session.hasToken) { + return; + } + + uint8_t chunks[TURN_BUDGET_FRAMES][MAX_MESSAGE_SIZE]; + size_t chunkLen[TURN_BUDGET_FRAMES] = {0}; + size_t nChunks = 0; + bool eof = false; + bool readError = false; + + while (nChunks < TURN_BUDGET_FRAMES && session.masterFd >= 0) { + const ssize_t n = read(session.masterFd, chunks[nChunks], MAX_MESSAGE_SIZE); + if (n > 0) { + chunkLen[nChunks] = (size_t)n; + nChunks++; + } else if (n == 0) { + eof = true; + break; + } else { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + readError = true; + } + break; + } + } + + // If we filled the budget, there may still be more output waiting; ask for a prompt re-grant. + const bool more = (nChunks == TURN_BUDGET_FRAMES) && ptyHasOutput(); + + session.hasToken = false; // we are handing the turn back below + if (nChunks > 0) { + session.lastActivityMs = millis(); + for (size_t i = 0; i < nChunks; i++) { + const uint32_t flags = (i + 1 == nChunks) ? (TURN_FLAG_GRANT | (more ? TURN_FLAG_MORE : 0u)) : 0u; + sendOutputFrame(chunks[i], chunkLen[i], flags); + } + } else { + // Nothing to send: hand the turn straight back so the link goes quiet at the client. + sendTurnGrant(false); + } + + if (eof) { + closeSession("pty_eof", true); + } else if (readError) { + LOG_WARN("DMShell: PTY read error errno=%d", errno); + closeSession("pty_read_error", true); + } +} + void DMShellModule::sendError(const char *message, NodeNum peer) { const size_t len = strnlen(message, MAX_MESSAGE_SIZE); diff --git a/src/modules/DMShell.h b/src/modules/DMShell.h index 51e4e710e..87a99fd08 100644 --- a/src/modules/DMShell.h +++ b/src/modules/DMShell.h @@ -24,6 +24,10 @@ struct DMShellSession { uint32_t nextExpectedRxSeq = 1; uint32_t highestSeenRxSeq = 0; uint32_t lastActivityMs = 0; + // --- Half-duplex turn-taking ("talking stick") state --- + bool turnManaged = false; // becomes true once the peer uses turn-taking flags; enables gating + bool hasToken = false; // do we currently hold the right to transmit? + uint32_t lastRtsMs = 0; // last time we asked the client for a turn (rate-limit) struct SentFrame { bool valid = false; meshtastic_RemoteShell_OpCode op = meshtastic_RemoteShell_OpCode_ERROR; @@ -70,6 +74,14 @@ class DMShellModule : private concurrency::OSThread, public SinglePortModule void sendAck(uint32_t replayFromSeq = 0); void sendFrameToPeer(NodeNum peer, meshtastic_RemoteShell frame, bool remember = true); void sendError(const char *message, NodeNum peer = 0); + + // --- Turn-taking helpers --- + void applyTurnFlags(const meshtastic_RemoteShell &frame); + bool ptyHasOutput(); + void serviceTurn(); + void sendOutputFrame(const uint8_t *data, size_t len, uint32_t extraFlags); + void sendTurnGrant(bool more); + void sendRts(); }; extern DMShellModule *dmShellModule;