Add R/L/D protocol extensions for RSSI sky scanning

Extend the rotctld wire protocol with three new commands that enable
signal strength measurement through the Carryout G2's DVB subsystem:

- R [n]: Read RSSI (handles motor→dvb→rssi→motor menu dance internally)
- L: Enable LNA for signal reception (one-time pre-scan setup)
- D: Discover capabilities (returns CAPS:rssi,lna for G2, empty for others)

Non-G2 protocols return RPRT -6 (not available) for R and L commands.
The menu state invariant is maintained after every operation so P commands
continue to work between RSSI reads.
This commit is contained in:
Ryan Malloy 2026-02-11 11:55:05 -07:00
parent da066cfb3b
commit 1192b31166
2 changed files with 76 additions and 0 deletions

View File

@ -50,6 +50,11 @@ class TravlerAntenna:
def config(self) -> AntennaConfig: def config(self) -> AntennaConfig:
return self._config return self._config
@property
def protocol(self) -> FirmwareProtocol:
"""Access the underlying firmware protocol (for capability checks)."""
return self._protocol
@property @property
def is_connected(self) -> bool: def is_connected(self) -> bool:
return self._protocol.is_connected return self._protocol.is_connected

View File

@ -8,6 +8,12 @@ Hamlib clients) use for AZ/EL rotor control:
S stop / disconnect S stop / disconnect
_ get model name _ get model name
q quit connection q quit connection
Extended commands for sky-scan integration (CarryoutG2 only):
R read RSSI signal strength ("R [iterations]")
L enable LNA for signal reception
D discover supported protocol extensions
""" """
from __future__ import annotations from __future__ import annotations
@ -16,6 +22,7 @@ import logging
import socket import socket
from travler_rotor.antenna import TravlerAntenna from travler_rotor.antenna import TravlerAntenna
from travler_rotor.protocol import CarryoutG2Protocol
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -97,6 +104,12 @@ class RotctldServer:
self._handle_model_name(conn) self._handle_model_name(conn)
elif cmd == "q": elif cmd == "q":
break break
elif cmd == "R":
self._handle_read_rssi(conn, cmd_parts)
elif cmd == "L":
self._handle_enable_lna(conn)
elif cmd == "D":
self._handle_capabilities(conn)
else: else:
logger.warning("Unknown command: %s", cmd) logger.warning("Unknown command: %s", cmd)
conn.sendall(b"RPRT -1\n") conn.sendall(b"RPRT -1\n")
@ -133,3 +146,61 @@ class RotctldServer:
def _handle_model_name(self, conn: socket.socket) -> None: def _handle_model_name(self, conn: socket.socket) -> None:
"""Respond to '_' — return model identification string.""" """Respond to '_' — return model identification string."""
conn.sendall(f"{MODEL_NAME}\n".encode()) conn.sendall(f"{MODEL_NAME}\n".encode())
def _handle_read_rssi(self, conn: socket.socket, parts: list[str]) -> None:
"""Respond to 'R [n]' — read RSSI signal strength.
Requires CarryoutG2Protocol. Handles the DVB menu switching internally:
motor menu -> quit -> dvb menu -> rssi -> quit dvb -> motor menu.
Non-G2 rotors return RPRT -6 (not available).
"""
if not isinstance(self._antenna.protocol, CarryoutG2Protocol):
conn.sendall(b"RPRT -6\n")
return
try:
protocol: CarryoutG2Protocol = self._antenna.protocol # type: ignore[assignment]
iterations = 10
if len(parts) > 1:
iterations = int(parts[1])
protocol.quit_submenu()
protocol.enter_dvb_menu()
reading = protocol.get_rssi(iterations)
protocol.quit_submenu()
protocol.enter_motor_menu()
response = f"{reading.reads}\n{reading.average}\n{reading.current}\n"
conn.sendall(response.encode("utf-8"))
except Exception:
logger.exception("Failed to read RSSI")
conn.sendall(b"RPRT -1\n")
def _handle_enable_lna(self, conn: socket.socket) -> None:
"""Respond to 'L' — enable LNA for signal reception.
One-time setup before scanning. Requires CarryoutG2Protocol.
"""
if not isinstance(self._antenna.protocol, CarryoutG2Protocol):
conn.sendall(b"RPRT -6\n")
return
try:
protocol: CarryoutG2Protocol = self._antenna.protocol # type: ignore[assignment]
protocol.quit_submenu()
protocol.enter_dvb_menu()
protocol.enable_lna()
protocol.quit_submenu()
protocol.enter_motor_menu()
conn.sendall(b"RPRT 0\n")
except Exception:
logger.exception("Failed to enable LNA")
conn.sendall(b"RPRT -1\n")
def _handle_capabilities(self, conn: socket.socket) -> None:
"""Respond to 'D' — discover supported protocol extensions."""
if isinstance(self._antenna.protocol, CarryoutG2Protocol):
conn.sendall(b"CAPS:rssi,lna\n")
else:
conn.sendall(b"CAPS:\n")