"""Thread-safe bridge between Birdcage TUI and CarryoutG2Protocol. Wraps all serial I/O in a threading.Lock so the TUI's worker threads don't stomp on each other. Tracks the current firmware submenu to minimize unnecessary q-then-reenter transitions. """ import contextlib import logging import re import threading from enum import Enum, auto from birdcage.protocol import CarryoutG2Protocol logger = logging.getLogger(__name__) class Menu(Enum): """Firmware submenu states.""" ROOT = auto() MOT = auto() DVB = auto() NVS = auto() A3981 = auto() ADC = auto() OS = auto() STEP = auto() PEAK = auto() EEPROM = auto() GPIO = auto() LATLON = auto() DIPSWITCH = auto() UNKNOWN = auto() # Map Menu enum to the command that enters it from root. _MENU_COMMANDS: dict[Menu, str] = { Menu.MOT: "mot", Menu.DVB: "dvb", Menu.NVS: "nvs", Menu.A3981: "a3981", Menu.ADC: "adc", Menu.OS: "os", Menu.STEP: "step", Menu.PEAK: "peak", Menu.EEPROM: "eeprom", Menu.GPIO: "gpio", Menu.LATLON: "latlon", Menu.DIPSWITCH: "dipswitch", } class SerialBridge: """Thread-safe wrapper around CarryoutG2Protocol for TUI consumption. All public methods acquire a lock before touching the serial port. The bridge tracks the current firmware submenu so it can skip redundant quit-and-reenter cycles. """ def __init__(self, protocol: CarryoutG2Protocol) -> None: self._proto = protocol self._lock = threading.Lock() self._cancel = threading.Event() self._menu = Menu.UNKNOWN self._connected = False # ------------------------------------------------------------------ # Menu prompt → string mapping for status display # ------------------------------------------------------------------ _MENU_PROMPTS: dict[Menu, str] = { Menu.ROOT: "TRK>", Menu.MOT: "MOT>", Menu.DVB: "DVB>", Menu.NVS: "NVS>", Menu.A3981: "A3981>", Menu.ADC: "ADC>", Menu.OS: "OS>", Menu.STEP: "STEP>", Menu.PEAK: "PEAK>", Menu.EEPROM: "EE>", Menu.GPIO: "GPIO>", Menu.LATLON: "LATLON>", Menu.DIPSWITCH: "DIPSWITCH>", Menu.UNKNOWN: "???", } @property def current_menu(self) -> str: """Current firmware prompt string for status display.""" return self._MENU_PROMPTS.get(self._menu, "???") # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _send(self, cmd: str) -> str: """Send a command via the protocol's prompt-terminated send. Caller must hold ``_lock``. """ return self._proto.send_raw(cmd) def _go_to_root(self) -> None: """Return to TRK> root menu. Caller must hold ``_lock``.""" self._proto.reset_to_root() self._menu = Menu.ROOT def _detect_menu(self) -> Menu: """Probe firmware prompt and return the corresponding Menu enum. Caller must hold ``_lock``. """ prompt = self._proto._probe_prompt() upper = prompt.upper() # Check against known prompt strings (handles EE> vs EEPROM>, etc.) for menu, prompt_str in self._MENU_PROMPTS.items(): if menu == Menu.UNKNOWN: continue if prompt_str.upper() in upper: return menu return Menu.UNKNOWN def _ensure_menu(self, target: Menu) -> None: """Navigate to *target* submenu if not already there. Caller must hold ``_lock``. """ if self._menu == target: return # Always go back to root first — we don't know how to go # directly between arbitrary submenus. if self._menu != Menu.ROOT: self._go_to_root() if target == Menu.ROOT: return cmd = _MENU_COMMANDS.get(target) if cmd is None: raise ValueError(f"No entry command for menu {target!r}") self._send(cmd) self._menu = target # ------------------------------------------------------------------ # Connection # ------------------------------------------------------------------ def connect(self, port: str, baudrate: int = 115200) -> None: """Open the RS-422 serial connection.""" with self._lock: self._proto.connect(port, baudrate) self._connected = True self._menu = self._detect_menu() def cancel_operation(self) -> None: """Signal any in-progress long-running operation to abort. Safe to call from any thread. The cancel event is checked every ~2 seconds by ``send_with_timeout``. """ self._cancel.set() def clear_cancel(self) -> None: """Reset the cancel event so future operations proceed normally.""" self._cancel.clear() def disconnect(self) -> None: """Close the serial connection. Signals cancellation first to unblock any long-running serial reads (e.g. firmware sweep), then acquires the lock to close the port cleanly. """ self._cancel.set() if not self._lock.acquire(timeout=5): # Lock held by dead/stuck worker — force-close the port # so the blocked serial read raises an exception. logger.warning("Lock acquisition timed out, force-closing port") with contextlib.suppress(Exception): self._proto.disconnect() self._connected = False self._menu = Menu.UNKNOWN self._cancel.clear() return try: with contextlib.suppress(Exception): self._go_to_root() self._proto.disconnect() self._connected = False self._menu = Menu.UNKNOWN self._cancel.clear() finally: self._lock.release() @property def is_connected(self) -> bool: return self._connected and self._proto.is_connected def initialize(self, skip_init: bool = False) -> None: """Prepare the dish for motor commands. Args: skip_init: If True, skip the protocol initialize step (useful when re-connecting to an already-running dish). """ with self._lock: if not skip_init: self._proto.initialize() self._menu = Menu.MOT # initialize() ends in MOT> # else: leave _menu as whatever connect() detected # ------------------------------------------------------------------ # Motor (MOT>) # ------------------------------------------------------------------ def get_position(self) -> dict[str, float]: """Query current AZ/EL position. Returns: ``{"azimuth": float, "elevation": float}`` """ with self._lock: self._ensure_menu(Menu.MOT) response = self._send("a") az_m = re.search(r"Angle\[0\]\s*=\s*(-?\d+\.?\d*)", response) el_m = re.search(r"Angle\[1\]\s*=\s*(-?\d+\.?\d*)", response) if not az_m or not el_m: raise ValueError(f"Could not parse position: {response!r}") return { "azimuth": float(az_m.group(1)), "elevation": float(el_m.group(1)), } def move_to(self, az: float, el: float) -> None: """Move the dish to an absolute AZ/EL position.""" with self._lock: self._ensure_menu(Menu.MOT) self._send(f"a 0 {az}") self._send(f"a 1 {el}") def move_motor(self, motor_id: int, degrees: float) -> None: """Move a single motor to an absolute position.""" with self._lock: self._ensure_menu(Menu.MOT) self._send(f"a {motor_id} {degrees}") def home_motor(self, motor_id: int) -> None: """Home a motor to its reference position.""" with self._lock: self._ensure_menu(Menu.MOT) self._send(f"h {motor_id}") def engage(self) -> None: """Engage (energize) the stepper motors.""" with self._lock: self._ensure_menu(Menu.MOT) self._send("e") def release(self) -> None: """Release (de-energize) the stepper motors.""" with self._lock: self._ensure_menu(Menu.MOT) self._send("r") def get_motor_list(self) -> str: """List motors and their state.""" with self._lock: self._ensure_menu(Menu.MOT) return self._send("l") def get_motor_dynamics(self) -> dict[str, float]: """Read max velocity and acceleration for both axes. Returns: ``{"az_max_vel": float, "el_max_vel": float, "az_accel": float, "el_accel": float}`` """ with self._lock: self._ensure_menu(Menu.MOT) mv_resp = self._send("mv") ma_resp = self._send("ma") result: dict[str, float] = { "az_max_vel": 0.0, "el_max_vel": 0.0, "az_accel": 0.0, "el_accel": 0.0, } # mv → "Max Vel [0] = 65.0 Max Vel [1] = 45.0" or similar vel_matches = re.findall(r"Max Vel\s*\[(\d)\]\s*=\s*(-?\d+\.?\d*)", mv_resp) for motor_id, val in vel_matches: if motor_id == "0": result["az_max_vel"] = float(val) elif motor_id == "1": result["el_max_vel"] = float(val) # ma → "Accel[0] = 400.0 Accel[1] = 400.0" acc_matches = re.findall(r"Accel\[(\d)\]\s*=\s*(-?\d+\.?\d*)", ma_resp) for motor_id, val in acc_matches: if motor_id == "0": result["az_accel"] = float(val) elif motor_id == "1": result["el_accel"] = float(val) return result def set_max_velocity(self, motor_id: int, deg_per_sec: float) -> None: """Set max velocity for a motor axis (°/s). Firmware: MOT> mv [motor] [vel].""" with self._lock: self._ensure_menu(Menu.MOT) self._send(f"mv {motor_id} {deg_per_sec:.1f}") def set_max_acceleration(self, motor_id: int, accel: float) -> None: """Set max acceleration for a motor axis. Firmware: MOT> ma [motor] [accel] (°/s²). """ with self._lock: self._ensure_menu(Menu.MOT) self._send(f"ma {motor_id} {accel:.1f}") def get_motor_life(self) -> str: """Read motor lifetime / usage statistics.""" with self._lock: self._ensure_menu(Menu.MOT) return self._send("life") def get_el_limits(self) -> dict[str, float]: """Read elevation min, max, and home angles. Firmware returns centidegrees: ``Min: 1800 Max: 6500 Home: 6500`` Returns: ``{"min": 18.0, "max": 65.0, "home": 65.0}`` """ with self._lock: self._ensure_menu(Menu.MOT) response = self._send("elminmaxhome") result: dict[str, float] = {"min": 0.0, "max": 0.0, "home": 0.0} min_m = re.search(r"Min:\s*(\d+)", response) max_m = re.search(r"Max:\s*(\d+)", response) home_m = re.search(r"Home:\s*(\d+)", response) if min_m: result["min"] = int(min_m.group(1)) / 100.0 if max_m: result["max"] = int(max_m.group(1)) / 100.0 if home_m: result["home"] = int(home_m.group(1)) / 100.0 return result def get_step_positions(self) -> dict[str, int]: """Read raw step positions for both axes. Firmware returns: ``Position[0] = 19998 Position[1] = 3116`` Returns: ``{"az_steps": int, "el_steps": int}`` """ with self._lock: self._ensure_menu(Menu.MOT) response = self._send("p") result: dict[str, int] = {"az_steps": 0, "el_steps": 0} matches = re.findall(r"Position\[(\d)\]\s*=\s*(-?\d+)", response) for motor_id, val in matches: if motor_id == "0": result["az_steps"] = int(val) elif motor_id == "1": result["el_steps"] = int(val) return result def get_pid_gains(self) -> dict[str, dict[str, float]]: """Read PID gains for both motor axes. Firmware returns: ``Kp=600 Kv=60 Ki=1`` per motor. Returns: ``{"az": {"kp": 600, "kv": 60, "ki": 1}, "el": {"kp": 250, "kv": 50, "ki": 1}}`` """ with self._lock: self._ensure_menu(Menu.MOT) response = self._send("pid") # Parse "Kp=600 Kv=60 Ki=1" patterns. The pid command without args # shows both motors. We look for two sets of Kp/Kv/Ki values. kp_matches = re.findall(r"Kp[=:]?\s*(\d+)", response) kv_matches = re.findall(r"Kv[=:]?\s*(\d+)", response) ki_matches = re.findall(r"Ki[=:]?\s*(\d+)", response) result = { "az": {"kp": 600.0, "kv": 60.0, "ki": 1.0}, "el": {"kp": 250.0, "kv": 50.0, "ki": 1.0}, } if len(kp_matches) >= 2: result["az"]["kp"] = float(kp_matches[0]) result["el"]["kp"] = float(kp_matches[1]) elif len(kp_matches) == 1: result["az"]["kp"] = float(kp_matches[0]) if len(kv_matches) >= 2: result["az"]["kv"] = float(kv_matches[0]) result["el"]["kv"] = float(kv_matches[1]) elif len(kv_matches) == 1: result["az"]["kv"] = float(kv_matches[0]) if len(ki_matches) >= 2: result["az"]["ki"] = float(ki_matches[0]) result["el"]["ki"] = float(ki_matches[1]) elif len(ki_matches) == 1: result["az"]["ki"] = float(ki_matches[0]) return result def set_pid_gains(self, motor_id: int, kp: float, kv: float, ki: float) -> None: """Write PID gains for a single motor axis. Args: motor_id: 0 for AZ, 1 for EL. kp: Proportional gain. kv: Velocity gain. ki: Integral gain. """ with self._lock: self._ensure_menu(Menu.MOT) self._send(f"pid {motor_id} {int(kp)} {int(kv)} {int(ki)}") def az_sweep_firmware( self, start_az: float, span: float, step_cdeg: int, num_xponders: int, timeout: float = 120, ) -> list[dict[str, float]]: """Execute a firmware-accelerated AZ sweep via azscanwxp. Moves to *start_az* first, then runs the firmware sweep command which handles motor movement and RSSI measurement atomically — no per-point serial round-trips. Args: start_az: Starting azimuth in degrees. span: Total sweep width in degrees. step_cdeg: Step size in centidegrees (100 = 1.00°). num_xponders: Number of transponders to cycle per position. timeout: Serial read timeout for the long-running command. Returns: List of dicts with keys: az, rssi, lock, snr. """ with self._lock: self._ensure_menu(Menu.MOT) # Move to start position and wait for prompt. self._send(f"a 0 {start_az}") # Execute firmware sweep with extended timeout. # Pass cancel event so disconnect() can interrupt the read. response = self._proto.send_with_timeout( f"azscanwxp 0 {span} {step_cdeg} {num_xponders}", timeout=timeout, cancel=self._cancel, ) # Parse streaming output lines. # Motor: Angle: RSSI: Lock:<0/1> SNR: results: list[dict[str, float]] = [] for match in re.finditer( r"Angle:(-?\d+)\s+RSSI:(\d+)\s+Lock:(\d)\s+SNR:(-?\d+\.?\d*)", response, ): results.append( { "az": int(match.group(1)) / 100.0, "rssi": float(match.group(2)), "lock": float(match.group(3)), "snr": float(match.group(4)), } ) logger.info("Firmware sweep returned %d points", len(results)) return results # ------------------------------------------------------------------ # Signal (DVB>) # ------------------------------------------------------------------ def get_rssi(self, iterations: int = 10) -> dict[str, int]: """Read averaged RSSI signal strength. Returns: ``{"reads": int, "average": int, "current": int}`` """ with self._lock: self._ensure_menu(Menu.DVB) response = self._send(f"rssi {iterations}") match = re.search( r"Reads:(\d+)\s+RSSI\[avg:\s*(\d+)\s+cur:\s*(\d+)\]", response, ) if match: return { "reads": int(match.group(1)), "average": int(match.group(2)), "current": int(match.group(3)), } raise ValueError(f"Could not parse RSSI: {response!r}") def enable_lna(self) -> None: """Enable LNA in ODU mode (sets LNB to 13V).""" with self._lock: self._ensure_menu(Menu.DVB) self._send("lnbdc odu") def get_lock_status(self) -> str: """Read quick lock status (single-shot).""" with self._lock: self._ensure_menu(Menu.DVB) return self._send("qls") def get_dvb_config(self) -> str: """Read BCM hardware/firmware version.""" with self._lock: self._ensure_menu(Menu.DVB) return self._send("config") def get_channel_params(self) -> str: """Read current channel parameters.""" with self._lock: self._ensure_menu(Menu.DVB) return self._send("dis") # ------------------------------------------------------------------ # A3981 # ------------------------------------------------------------------ def get_a3981_diag(self) -> str: """Read A3981 diagnostic/fault status.""" with self._lock: self._ensure_menu(Menu.A3981) return self._send("diag") def get_a3981_modes(self) -> dict[str, str]: """Read A3981 step mode, current mode, and step size. Returns: ``{"step_mode": str, "current_mode": str, "step_size": str}`` """ with self._lock: self._ensure_menu(Menu.A3981) sm_resp = self._send("sm") cm_resp = self._send("cm") ss_resp = self._send("ss") return { "step_mode": sm_resp, "current_mode": cm_resp, "step_size": ss_resp, } def get_a3981_torque(self) -> str: """Read A3981 torque levels.""" with self._lock: self._ensure_menu(Menu.A3981) return self._send("st") # ------------------------------------------------------------------ # NVS # ------------------------------------------------------------------ def nvs_dump(self) -> str: """Dump all NVS values.""" with self._lock: self._ensure_menu(Menu.NVS) return self._send("d") def nvs_read(self, index: int) -> str: """Read a single NVS value by index.""" with self._lock: self._ensure_menu(Menu.NVS) return self._send(f"e {index}") # ------------------------------------------------------------------ # ADC # ------------------------------------------------------------------ def get_adc_rssi(self) -> str: """Read single-shot ADC RSSI value.""" with self._lock: self._ensure_menu(Menu.ADC) return self._send("rssi") def get_board_id(self) -> str: """Read board identification string.""" with self._lock: self._ensure_menu(Menu.ADC) return self._send("bdid") # ------------------------------------------------------------------ # OS # ------------------------------------------------------------------ def get_firmware_id(self) -> str: """Read full MCU and firmware identification.""" with self._lock: self._ensure_menu(Menu.OS) return self._send("id") # ------------------------------------------------------------------ # Raw / Console # ------------------------------------------------------------------ def send_raw(self, cmd: str) -> str: """Send an arbitrary command and return the raw response. After a raw command, the menu state is marked UNKNOWN because the user may have navigated to a different submenu. """ with self._lock: response = self._send(cmd) self._menu = Menu.UNKNOWN return response