birdcage/src/travler_rotor/protocol.py
Ryan Malloy c93bbef26d Initial travler-rotor library scaffolding
Extract Gabe Emerson's Trav'ler rotor scripts into a proper Python
library with firmware protocol abstraction (HAL 2.05 + HAL 0.0.00),
Hamlib rotctld TCP server, Click CLI, and isolated leap-frog algorithm
with the elevation copy-paste bug fixed.
2026-02-11 04:10:17 -07:00

245 lines
7.6 KiB
Python

"""Firmware protocol abstraction for Winegard Trav'ler RS-485 communication.
Each firmware version (HAL) uses slightly different serial commands, boot
signals, and submenu structures. This module defines an abstract protocol
and concrete implementations so the rest of the library doesn't care which
firmware is on the other end of the wire.
"""
from __future__ import annotations
import logging
import re
import time
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
import serial
logger = logging.getLogger(__name__)
# Motor IDs used in "a <id> <degrees>" commands
MOTOR_AZIMUTH = 0
MOTOR_ELEVATION = 1
@dataclass
class Position:
"""Current dish orientation."""
azimuth: float
elevation: float
skew: float | None = None
class FirmwareProtocol(ABC):
"""Abstract base for Winegard firmware communication over RS-485."""
def __init__(self) -> None:
self._serial: serial.Serial | None = None
@property
def is_connected(self) -> bool:
return self._serial is not None and self._serial.is_open
def connect(self, port: str, baudrate: int = 57600) -> None:
"""Open the RS-485 serial connection."""
self._serial = serial.Serial(
port=port,
baudrate=baudrate,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1,
)
logger.info("Connected on %s at %d baud", port, baudrate)
def disconnect(self) -> None:
"""Close the serial connection."""
if self._serial and self._serial.is_open:
self.reset_to_root()
self._serial.close()
logger.info("Disconnected")
self._serial = None
def _write(self, cmd: str) -> None:
"""Send a command string followed by carriage return."""
if not self._serial:
raise RuntimeError("Not connected")
self._serial.write(f"{cmd}\r".encode("ascii"))
def _read(self, size: int = 200) -> str:
"""Read up to `size` bytes from serial, decode with error tolerance."""
if not self._serial:
raise RuntimeError("Not connected")
self._serial.flush()
return self._serial.read(size).decode(errors="ignore").strip()
def _readline(self) -> str:
"""Read a single line from serial."""
if not self._serial:
raise RuntimeError("Not connected")
return self._serial.readline().decode(errors="ignore").strip()
def reset_to_root(self) -> None:
"""Return to the firmware root menu."""
self._write("q")
self._write("") # clear prompt
@abstractmethod
def initialize(self, callback: Callable[[str], None] | None = None) -> None:
"""Wait for boot to complete and kill the satellite search task.
Args:
callback: Optional function called with each status line
received during boot (useful for progress display).
"""
@abstractmethod
def enter_motor_menu(self) -> None:
"""Navigate into the motor control submenu."""
def get_position(self) -> Position:
"""Query the dish for its current AZ/EL/SK position."""
self._write("a")
reply = self._read()
az_match = re.search(r"AZ =\s*(\d+\.\d+)", reply)
el_match = re.search(r"EL =\s*(\d+\.\d+)", reply)
sk_match = re.search(r"SK =\s*(\d+\.\d+)", reply)
if not az_match or not el_match:
raise ValueError(f"Could not parse position from: {reply!r}")
return Position(
azimuth=float(az_match.group(1)),
elevation=float(el_match.group(1)),
skew=float(sk_match.group(1)) if sk_match else None,
)
def move_motor(self, motor_id: int, degrees: float) -> None:
"""Command a single motor to an absolute position.
Args:
motor_id: MOTOR_AZIMUTH (0) or MOTOR_ELEVATION (1).
degrees: Target angle in degrees.
"""
self._write(f"a {motor_id} {degrees}")
@abstractmethod
def kill_search(self) -> None:
"""Cancel the firmware's automatic TV satellite search."""
class HAL205Protocol(FirmwareProtocol):
"""HAL 2.05.003 firmware.
Boot signals: "NoGPS" or "No LNB Voltage"
Motor submenu: "motor"
Search kill: ngsearch -> s -> q
"""
BOOT_SIGNALS = ("NoGPS", "No LNB Voltage")
MOTOR_COMMAND = "motor"
def initialize(self, callback: Callable[[str], None] | None = None) -> None:
logger.info("Waiting for HAL 2.05 boot (ensure IDU is powered on)...")
while True:
line = self._readline()
if not line:
continue
if callback:
callback(line)
logger.debug("Boot: %s", line)
if any(signal in line for signal in self.BOOT_SIGNALS):
logger.info("Boot complete — homing finished")
break
self.kill_search()
self.reset_to_root()
def kill_search(self) -> None:
self.reset_to_root()
self._write("ngsearch")
time.sleep(0.2)
self._write("s")
time.sleep(0.2)
self._write("q")
self._write("")
logger.info("Search task cancelled")
def enter_motor_menu(self) -> None:
self.reset_to_root()
self._write(self.MOTOR_COMMAND)
class HAL000Protocol(FirmwareProtocol):
"""HAL 0.0.00 firmware.
Uses shorter command names and a different init sequence.
Motor submenu: "mot"
"""
MOTOR_COMMAND = "mot"
def initialize(self, callback: Callable[[str], None] | None = None) -> None:
# HAL 0.0.00 has a different boot sequence — the exact signals
# are not documented in the upstream repo. This is a best-effort
# implementation that should be validated against real hardware.
logger.info("Waiting for HAL 0.0.00 boot...")
while True:
line = self._readline()
if not line:
continue
if callback:
callback(line)
logger.debug("Boot: %s", line)
# HAL 0.0.00 boot detection — adjust if hardware reveals
# different signals
if "NoGPS" in line or "ready" in line.lower():
logger.info("Boot complete")
break
self.kill_search()
self.reset_to_root()
def kill_search(self) -> None:
# HAL 0.0.00 may have a different search-kill sequence.
# Falling back to root-menu reset as a safe default.
self.reset_to_root()
logger.info("Search task cancelled (HAL 0.0.00)")
def enter_motor_menu(self) -> None:
self.reset_to_root()
self._write(self.MOTOR_COMMAND)
# Registry for firmware lookup by name
FIRMWARE_REGISTRY: dict[str, type[FirmwareProtocol]] = {
"hal205": HAL205Protocol,
"hal000": HAL000Protocol,
}
def get_protocol(name: str) -> FirmwareProtocol:
"""Instantiate a firmware protocol by short name.
Args:
name: One of "hal205", "hal000".
Raises:
KeyError: If the firmware name is not recognized.
"""
try:
return FIRMWARE_REGISTRY[name.lower()]()
except KeyError:
available = ", ".join(sorted(FIRMWARE_REGISTRY))
raise KeyError(f"Unknown firmware {name!r}. Available: {available}") from None