From 1192b311660fdb23d03fc79ff9f05ab4bfcdc313 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Wed, 11 Feb 2026 11:55:05 -0700 Subject: [PATCH] Add R/L/D protocol extensions for RSSI sky scanning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend the rotctld wire protocol with three new commands that enable signal strength measurement through the Carryout G2's DVB subsystem: - R [n]: Read RSSI (handles motor→dvb→rssi→motor menu dance internally) - L: Enable LNA for signal reception (one-time pre-scan setup) - D: Discover capabilities (returns CAPS:rssi,lna for G2, empty for others) Non-G2 protocols return RPRT -6 (not available) for R and L commands. The menu state invariant is maintained after every operation so P commands continue to work between RSSI reads. --- src/travler_rotor/antenna.py | 5 +++ src/travler_rotor/rotctld.py | 71 ++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/src/travler_rotor/antenna.py b/src/travler_rotor/antenna.py index 0eb020b..b62de2b 100644 --- a/src/travler_rotor/antenna.py +++ b/src/travler_rotor/antenna.py @@ -50,6 +50,11 @@ class TravlerAntenna: def config(self) -> AntennaConfig: return self._config + @property + def protocol(self) -> FirmwareProtocol: + """Access the underlying firmware protocol (for capability checks).""" + return self._protocol + @property def is_connected(self) -> bool: return self._protocol.is_connected diff --git a/src/travler_rotor/rotctld.py b/src/travler_rotor/rotctld.py index 0102edc..8bdd79e 100644 --- a/src/travler_rotor/rotctld.py +++ b/src/travler_rotor/rotctld.py @@ -8,6 +8,12 @@ Hamlib clients) use for AZ/EL rotor control: S — stop / disconnect _ — get model name q — quit connection + +Extended commands for sky-scan integration (CarryoutG2 only): + + R — read RSSI signal strength ("R [iterations]") + L — enable LNA for signal reception + D — discover supported protocol extensions """ from __future__ import annotations @@ -16,6 +22,7 @@ import logging import socket from travler_rotor.antenna import TravlerAntenna +from travler_rotor.protocol import CarryoutG2Protocol logger = logging.getLogger(__name__) @@ -97,6 +104,12 @@ class RotctldServer: self._handle_model_name(conn) elif cmd == "q": break + elif cmd == "R": + self._handle_read_rssi(conn, cmd_parts) + elif cmd == "L": + self._handle_enable_lna(conn) + elif cmd == "D": + self._handle_capabilities(conn) else: logger.warning("Unknown command: %s", cmd) conn.sendall(b"RPRT -1\n") @@ -133,3 +146,61 @@ class RotctldServer: def _handle_model_name(self, conn: socket.socket) -> None: """Respond to '_' — return model identification string.""" conn.sendall(f"{MODEL_NAME}\n".encode()) + + def _handle_read_rssi(self, conn: socket.socket, parts: list[str]) -> None: + """Respond to 'R [n]' — read RSSI signal strength. + + Requires CarryoutG2Protocol. Handles the DVB menu switching internally: + motor menu -> quit -> dvb menu -> rssi -> quit dvb -> motor menu. + Non-G2 rotors return RPRT -6 (not available). + """ + if not isinstance(self._antenna.protocol, CarryoutG2Protocol): + conn.sendall(b"RPRT -6\n") + return + + try: + protocol: CarryoutG2Protocol = self._antenna.protocol # type: ignore[assignment] + iterations = 10 + if len(parts) > 1: + iterations = int(parts[1]) + + protocol.quit_submenu() + protocol.enter_dvb_menu() + reading = protocol.get_rssi(iterations) + protocol.quit_submenu() + protocol.enter_motor_menu() + + response = f"{reading.reads}\n{reading.average}\n{reading.current}\n" + conn.sendall(response.encode("utf-8")) + except Exception: + logger.exception("Failed to read RSSI") + conn.sendall(b"RPRT -1\n") + + def _handle_enable_lna(self, conn: socket.socket) -> None: + """Respond to 'L' — enable LNA for signal reception. + + One-time setup before scanning. Requires CarryoutG2Protocol. + """ + if not isinstance(self._antenna.protocol, CarryoutG2Protocol): + conn.sendall(b"RPRT -6\n") + return + + try: + protocol: CarryoutG2Protocol = self._antenna.protocol # type: ignore[assignment] + protocol.quit_submenu() + protocol.enter_dvb_menu() + protocol.enable_lna() + protocol.quit_submenu() + protocol.enter_motor_menu() + + conn.sendall(b"RPRT 0\n") + except Exception: + logger.exception("Failed to enable LNA") + conn.sendall(b"RPRT -1\n") + + def _handle_capabilities(self, conn: socket.socket) -> None: + """Respond to 'D' — discover supported protocol extensions.""" + if isinstance(self._antenna.protocol, CarryoutG2Protocol): + conn.sendall(b"CAPS:rssi,lna\n") + else: + conn.sendall(b"CAPS:\n")