"""Thread-safe bridge between Birdcage TUI and CarryoutG2Protocol. Wraps all serial I/O in a threading.Lock so the TUI's worker threads don't stomp on each other. Tracks the current firmware submenu to minimize unnecessary q-then-reenter transitions. """ import contextlib import logging import re import threading from enum import Enum, auto from birdcage.protocol import CarryoutG2Protocol logger = logging.getLogger(__name__) class Menu(Enum): """Firmware submenu states.""" ROOT = auto() MOT = auto() DVB = auto() NVS = auto() A3981 = auto() ADC = auto() OS = auto() STEP = auto() PEAK = auto() EEPROM = auto() GPIO = auto() LATLON = auto() DIPSWITCH = auto() UNKNOWN = auto() # Map Menu enum to the command that enters it from root. _MENU_COMMANDS: dict[Menu, str] = { Menu.MOT: "mot", Menu.DVB: "dvb", Menu.NVS: "nvs", Menu.A3981: "a3981", Menu.ADC: "adc", Menu.OS: "os", Menu.STEP: "step", Menu.PEAK: "peak", Menu.EEPROM: "eeprom", Menu.GPIO: "gpio", Menu.LATLON: "latlon", Menu.DIPSWITCH: "dipswitch", } class SerialBridge: """Thread-safe wrapper around CarryoutG2Protocol for TUI consumption. All public methods acquire a lock before touching the serial port. The bridge tracks the current firmware submenu so it can skip redundant quit-and-reenter cycles. """ def __init__(self, protocol: CarryoutG2Protocol) -> None: self._proto = protocol self._lock = threading.Lock() self._menu = Menu.UNKNOWN self._connected = False # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _send(self, cmd: str) -> str: """Send a command via the protocol's prompt-terminated send. Caller must hold ``_lock``. """ return self._proto.send_raw(cmd) def _go_to_root(self) -> None: """Return to TRK> root menu. Caller must hold ``_lock``.""" self._proto.reset_to_root() self._menu = Menu.ROOT def _ensure_menu(self, target: Menu) -> None: """Navigate to *target* submenu if not already there. Caller must hold ``_lock``. """ if self._menu == target: return # Always go back to root first — we don't know how to go # directly between arbitrary submenus. if self._menu != Menu.ROOT: self._go_to_root() if target == Menu.ROOT: return cmd = _MENU_COMMANDS.get(target) if cmd is None: raise ValueError(f"No entry command for menu {target!r}") self._send(cmd) self._menu = target # ------------------------------------------------------------------ # Connection # ------------------------------------------------------------------ def connect(self, port: str, baudrate: int = 115200) -> None: """Open the RS-422 serial connection.""" with self._lock: self._proto.connect(port, baudrate) self._connected = True self._menu = Menu.UNKNOWN def disconnect(self) -> None: """Close the serial connection.""" with self._lock: with contextlib.suppress(Exception): self._go_to_root() self._proto.disconnect() self._connected = False self._menu = Menu.UNKNOWN @property def is_connected(self) -> bool: return self._connected and self._proto.is_connected def initialize(self, skip_init: bool = False) -> None: """Prepare the dish for motor commands. Args: skip_init: If True, skip the protocol initialize step (useful when re-connecting to an already-running dish). """ with self._lock: if not skip_init: self._proto.initialize() self._menu = Menu.MOT # initialize() ends in MOT> # ------------------------------------------------------------------ # Motor (MOT>) # ------------------------------------------------------------------ def get_position(self) -> dict[str, float]: """Query current AZ/EL position. Returns: ``{"azimuth": float, "elevation": float}`` """ with self._lock: self._ensure_menu(Menu.MOT) response = self._send("a") az_m = re.search(r"Angle\[0\]\s*=\s*(-?\d+\.?\d*)", response) el_m = re.search(r"Angle\[1\]\s*=\s*(-?\d+\.?\d*)", response) if not az_m or not el_m: raise ValueError(f"Could not parse position: {response!r}") return { "azimuth": float(az_m.group(1)), "elevation": float(el_m.group(1)), } def move_to(self, az: float, el: float) -> None: """Move the dish to an absolute AZ/EL position.""" with self._lock: self._ensure_menu(Menu.MOT) self._send(f"a 0 {az}") self._send(f"a 1 {el}") def move_motor(self, motor_id: int, degrees: float) -> None: """Move a single motor to an absolute position.""" with self._lock: self._ensure_menu(Menu.MOT) self._send(f"a {motor_id} {degrees}") def home_motor(self, motor_id: int) -> None: """Home a motor to its reference position.""" with self._lock: self._ensure_menu(Menu.MOT) self._send(f"h {motor_id}") def engage(self) -> None: """Engage (energize) the stepper motors.""" with self._lock: self._ensure_menu(Menu.MOT) self._send("e") def release(self) -> None: """Release (de-energize) the stepper motors.""" with self._lock: self._ensure_menu(Menu.MOT) self._send("r") def get_motor_list(self) -> str: """List motors and their state.""" with self._lock: self._ensure_menu(Menu.MOT) return self._send("l") def get_motor_dynamics(self) -> dict[str, float]: """Read max velocity and acceleration for both axes. Returns: ``{"az_max_vel": float, "el_max_vel": float, "az_accel": float, "el_accel": float}`` """ with self._lock: self._ensure_menu(Menu.MOT) mv_resp = self._send("mv") ma_resp = self._send("ma") result: dict[str, float] = { "az_max_vel": 0.0, "el_max_vel": 0.0, "az_accel": 0.0, "el_accel": 0.0, } # mv → "Max Vel [0] = 65.0 Max Vel [1] = 45.0" or similar vel_matches = re.findall(r"Max Vel\s*\[(\d)\]\s*=\s*(-?\d+\.?\d*)", mv_resp) for motor_id, val in vel_matches: if motor_id == "0": result["az_max_vel"] = float(val) elif motor_id == "1": result["el_max_vel"] = float(val) # ma → "Accel[0] = 400.0 Accel[1] = 400.0" acc_matches = re.findall(r"Accel\[(\d)\]\s*=\s*(-?\d+\.?\d*)", ma_resp) for motor_id, val in acc_matches: if motor_id == "0": result["az_accel"] = float(val) elif motor_id == "1": result["el_accel"] = float(val) return result def get_motor_life(self) -> str: """Read motor lifetime / usage statistics.""" with self._lock: self._ensure_menu(Menu.MOT) return self._send("life") def get_el_limits(self) -> dict[str, float]: """Read elevation min, max, and home angles. Firmware returns centidegrees: ``Min: 1800 Max: 6500 Home: 6500`` Returns: ``{"min": 18.0, "max": 65.0, "home": 65.0}`` """ with self._lock: self._ensure_menu(Menu.MOT) response = self._send("elminmaxhome") result: dict[str, float] = {"min": 0.0, "max": 0.0, "home": 0.0} min_m = re.search(r"Min:\s*(\d+)", response) max_m = re.search(r"Max:\s*(\d+)", response) home_m = re.search(r"Home:\s*(\d+)", response) if min_m: result["min"] = int(min_m.group(1)) / 100.0 if max_m: result["max"] = int(max_m.group(1)) / 100.0 if home_m: result["home"] = int(home_m.group(1)) / 100.0 return result def get_step_positions(self) -> dict[str, int]: """Read raw step positions for both axes. Firmware returns: ``Position[0] = 19998 Position[1] = 3116`` Returns: ``{"az_steps": int, "el_steps": int}`` """ with self._lock: self._ensure_menu(Menu.MOT) response = self._send("p") result: dict[str, int] = {"az_steps": 0, "el_steps": 0} matches = re.findall(r"Position\[(\d)\]\s*=\s*(-?\d+)", response) for motor_id, val in matches: if motor_id == "0": result["az_steps"] = int(val) elif motor_id == "1": result["el_steps"] = int(val) return result # ------------------------------------------------------------------ # Signal (DVB>) # ------------------------------------------------------------------ def get_rssi(self, iterations: int = 10) -> dict[str, int]: """Read averaged RSSI signal strength. Returns: ``{"reads": int, "average": int, "current": int}`` """ with self._lock: self._ensure_menu(Menu.DVB) response = self._send(f"rssi {iterations}") match = re.search( r"Reads:(\d+)\s+RSSI\[avg:\s*(\d+)\s+cur:\s*(\d+)\]", response, ) if match: return { "reads": int(match.group(1)), "average": int(match.group(2)), "current": int(match.group(3)), } raise ValueError(f"Could not parse RSSI: {response!r}") def enable_lna(self) -> None: """Enable LNA in ODU mode (sets LNB to 13V).""" with self._lock: self._ensure_menu(Menu.DVB) self._send("lnbdc odu") def get_lock_status(self) -> str: """Read quick lock status (single-shot).""" with self._lock: self._ensure_menu(Menu.DVB) return self._send("qls") def get_dvb_config(self) -> str: """Read BCM hardware/firmware version.""" with self._lock: self._ensure_menu(Menu.DVB) return self._send("config") def get_channel_params(self) -> str: """Read current channel parameters.""" with self._lock: self._ensure_menu(Menu.DVB) return self._send("dis") # ------------------------------------------------------------------ # A3981 # ------------------------------------------------------------------ def get_a3981_diag(self) -> str: """Read A3981 diagnostic/fault status.""" with self._lock: self._ensure_menu(Menu.A3981) return self._send("diag") def get_a3981_modes(self) -> dict[str, str]: """Read A3981 step mode, current mode, and step size. Returns: ``{"step_mode": str, "current_mode": str, "step_size": str}`` """ with self._lock: self._ensure_menu(Menu.A3981) sm_resp = self._send("sm") cm_resp = self._send("cm") ss_resp = self._send("ss") return { "step_mode": sm_resp, "current_mode": cm_resp, "step_size": ss_resp, } def get_a3981_torque(self) -> str: """Read A3981 torque levels.""" with self._lock: self._ensure_menu(Menu.A3981) return self._send("st") # ------------------------------------------------------------------ # NVS # ------------------------------------------------------------------ def nvs_dump(self) -> str: """Dump all NVS values.""" with self._lock: self._ensure_menu(Menu.NVS) return self._send("d") def nvs_read(self, index: int) -> str: """Read a single NVS value by index.""" with self._lock: self._ensure_menu(Menu.NVS) return self._send(f"e {index}") # ------------------------------------------------------------------ # ADC # ------------------------------------------------------------------ def get_adc_rssi(self) -> str: """Read single-shot ADC RSSI value.""" with self._lock: self._ensure_menu(Menu.ADC) return self._send("rssi") def get_board_id(self) -> str: """Read board identification string.""" with self._lock: self._ensure_menu(Menu.ADC) return self._send("bdid") # ------------------------------------------------------------------ # OS # ------------------------------------------------------------------ def get_firmware_id(self) -> str: """Read full MCU and firmware identification.""" with self._lock: self._ensure_menu(Menu.OS) return self._send("id") # ------------------------------------------------------------------ # Raw / Console # ------------------------------------------------------------------ def send_raw(self, cmd: str) -> str: """Send an arbitrary command and return the raw response. After a raw command, the menu state is marked UNKNOWN because the user may have navigated to a different submenu. """ with self._lock: response = self._send(cmd) self._menu = Menu.UNKNOWN return response