"""Firmware protocol abstraction for Winegard Trav'ler RS-485 communication. Each firmware version (HAL) uses slightly different serial commands, boot signals, and submenu structures. This module defines an abstract protocol and concrete implementations so the rest of the library doesn't care which firmware is on the other end of the wire. """ from __future__ import annotations import logging import re import time from abc import ABC, abstractmethod from collections.abc import Callable from dataclasses import dataclass import serial logger = logging.getLogger(__name__) # Motor IDs used in "a " commands MOTOR_AZIMUTH = 0 MOTOR_ELEVATION = 1 @dataclass class Position: """Current dish orientation.""" azimuth: float elevation: float skew: float | None = None class FirmwareProtocol(ABC): """Abstract base for Winegard firmware communication over RS-485.""" def __init__(self) -> None: self._serial: serial.Serial | None = None @property def is_connected(self) -> bool: return self._serial is not None and self._serial.is_open def connect(self, port: str, baudrate: int = 57600) -> None: """Open the RS-485 serial connection.""" self._serial = serial.Serial( port=port, baudrate=baudrate, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=1, ) logger.info("Connected on %s at %d baud", port, baudrate) def disconnect(self) -> None: """Close the serial connection.""" if self._serial and self._serial.is_open: self.reset_to_root() self._serial.close() logger.info("Disconnected") self._serial = None def _write(self, cmd: str) -> None: """Send a command string followed by carriage return.""" if not self._serial: raise RuntimeError("Not connected") self._serial.write(f"{cmd}\r".encode("ascii")) def _read(self, size: int = 200) -> str: """Read up to `size` bytes from serial, decode with error tolerance.""" if not self._serial: raise RuntimeError("Not connected") self._serial.flush() return self._serial.read(size).decode(errors="ignore").strip() def _readline(self) -> str: """Read a single line from serial.""" if not self._serial: raise RuntimeError("Not connected") return self._serial.readline().decode(errors="ignore").strip() def reset_to_root(self) -> None: """Return to the firmware root menu.""" self._write("q") self._write("") # clear prompt @abstractmethod def initialize(self, callback: Callable[[str], None] | None = None) -> None: """Wait for boot to complete and kill the satellite search task. Args: callback: Optional function called with each status line received during boot (useful for progress display). """ @abstractmethod def enter_motor_menu(self) -> None: """Navigate into the motor control submenu.""" def get_position(self) -> Position: """Query the dish for its current AZ/EL/SK position.""" self._write("a") reply = self._read() az_match = re.search(r"AZ =\s*(\d+\.\d+)", reply) el_match = re.search(r"EL =\s*(\d+\.\d+)", reply) sk_match = re.search(r"SK =\s*(\d+\.\d+)", reply) if not az_match or not el_match: raise ValueError(f"Could not parse position from: {reply!r}") return Position( azimuth=float(az_match.group(1)), elevation=float(el_match.group(1)), skew=float(sk_match.group(1)) if sk_match else None, ) def move_motor(self, motor_id: int, degrees: float) -> None: """Command a single motor to an absolute position. Args: motor_id: MOTOR_AZIMUTH (0) or MOTOR_ELEVATION (1). degrees: Target angle in degrees. """ self._write(f"a {motor_id} {degrees}") @abstractmethod def kill_search(self) -> None: """Cancel the firmware's automatic TV satellite search.""" class HAL205Protocol(FirmwareProtocol): """HAL 2.05.003 firmware. Boot signals: "NoGPS" or "No LNB Voltage" Motor submenu: "motor" Search kill: ngsearch -> s -> q """ BOOT_SIGNALS = ("NoGPS", "No LNB Voltage") MOTOR_COMMAND = "motor" def initialize(self, callback: Callable[[str], None] | None = None) -> None: logger.info("Waiting for HAL 2.05 boot (ensure IDU is powered on)...") while True: line = self._readline() if not line: continue if callback: callback(line) logger.debug("Boot: %s", line) if any(signal in line for signal in self.BOOT_SIGNALS): logger.info("Boot complete — homing finished") break self.kill_search() self.reset_to_root() def kill_search(self) -> None: self.reset_to_root() self._write("ngsearch") time.sleep(0.2) self._write("s") time.sleep(0.2) self._write("q") self._write("") logger.info("Search task cancelled") def enter_motor_menu(self) -> None: self.reset_to_root() self._write(self.MOTOR_COMMAND) class HAL000Protocol(FirmwareProtocol): """HAL 0.0.00 firmware. Uses shorter command names and a different init sequence. Motor submenu: "mot" """ MOTOR_COMMAND = "mot" def initialize(self, callback: Callable[[str], None] | None = None) -> None: # HAL 0.0.00 has a different boot sequence — the exact signals # are not documented in the upstream repo. This is a best-effort # implementation that should be validated against real hardware. logger.info("Waiting for HAL 0.0.00 boot...") while True: line = self._readline() if not line: continue if callback: callback(line) logger.debug("Boot: %s", line) # HAL 0.0.00 boot detection — adjust if hardware reveals # different signals if "NoGPS" in line or "ready" in line.lower(): logger.info("Boot complete") break self.kill_search() self.reset_to_root() def kill_search(self) -> None: # HAL 0.0.00 may have a different search-kill sequence. # Falling back to root-menu reset as a safe default. self.reset_to_root() logger.info("Search task cancelled (HAL 0.0.00)") def enter_motor_menu(self) -> None: self.reset_to_root() self._write(self.MOTOR_COMMAND) # Registry for firmware lookup by name FIRMWARE_REGISTRY: dict[str, type[FirmwareProtocol]] = { "hal205": HAL205Protocol, "hal000": HAL000Protocol, } def get_protocol(name: str) -> FirmwareProtocol: """Instantiate a firmware protocol by short name. Args: name: One of "hal205", "hal000". Raises: KeyError: If the firmware name is not recognized. """ try: return FIRMWARE_REGISTRY[name.lower()]() except KeyError: available = ", ".join(sorted(FIRMWARE_REGISTRY)) raise KeyError(f"Unknown firmware {name!r}. Available: {available}") from None