birdcage/src/birdcage/bridge.py
Ryan Malloy f8bfd69ceb Add LNB polarity toggle (V-pol 13V / H-pol 18V)
Bridge: set_lnb_voltage(mode) wraps firmware lnbdc command, enable_lna()
now delegates to it. MCP: new set_lnb_voltage tool + 3 tests. TUI: Signal
screen button toggles between V-pol and H-pol instead of one-way LNA enable.
2026-02-17 17:24:05 -07:00

658 lines
22 KiB
Python

"""Thread-safe bridge between consumers and CarryoutG2Protocol.
Wraps all serial I/O in a threading.Lock so concurrent callers
don't stomp on each other. Tracks the current firmware submenu to
minimize unnecessary q-then-reenter transitions.
"""
import contextlib
import logging
import re
import threading
from enum import Enum, auto
from birdcage.protocol import CarryoutG2Protocol
logger = logging.getLogger(__name__)
class Menu(Enum):
"""Firmware submenu states."""
ROOT = auto()
MOT = auto()
DVB = auto()
NVS = auto()
A3981 = auto()
ADC = auto()
OS = auto()
STEP = auto()
PEAK = auto()
EEPROM = auto()
GPIO = auto()
LATLON = auto()
DIPSWITCH = auto()
UNKNOWN = auto()
# Map Menu enum to the command that enters it from root.
_MENU_COMMANDS: dict[Menu, str] = {
Menu.MOT: "mot",
Menu.DVB: "dvb",
Menu.NVS: "nvs",
Menu.A3981: "a3981",
Menu.ADC: "adc",
Menu.OS: "os",
Menu.STEP: "step",
Menu.PEAK: "peak",
Menu.EEPROM: "eeprom",
Menu.GPIO: "gpio",
Menu.LATLON: "latlon",
Menu.DIPSWITCH: "dipswitch",
}
class SerialBridge:
"""Thread-safe wrapper around CarryoutG2Protocol.
All public methods acquire a lock before touching the serial port.
The bridge tracks the current firmware submenu so it can skip
redundant quit-and-reenter cycles.
"""
def __init__(self, protocol: CarryoutG2Protocol) -> None:
self._proto = protocol
self._lock = threading.Lock()
self._cancel = threading.Event()
self._menu = Menu.UNKNOWN
self._connected = False
# ------------------------------------------------------------------
# Menu prompt -> string mapping for status display
# ------------------------------------------------------------------
_MENU_PROMPTS: dict[Menu, str] = {
Menu.ROOT: "TRK>",
Menu.MOT: "MOT>",
Menu.DVB: "DVB>",
Menu.NVS: "NVS>",
Menu.A3981: "A3981>",
Menu.ADC: "ADC>",
Menu.OS: "OS>",
Menu.STEP: "STEP>",
Menu.PEAK: "PEAK>",
Menu.EEPROM: "EE>",
Menu.GPIO: "GPIO>",
Menu.LATLON: "LATLON>",
Menu.DIPSWITCH: "DIPSWITCH>",
Menu.UNKNOWN: "???",
}
@property
def current_menu(self) -> str:
"""Current firmware prompt string for status display."""
return self._MENU_PROMPTS.get(self._menu, "???")
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _send(self, cmd: str) -> str:
"""Send a command via the protocol's prompt-terminated send.
Caller must hold ``_lock``.
"""
return self._proto.send_raw(cmd)
def _go_to_root(self) -> None:
"""Return to TRK> root menu. Caller must hold ``_lock``."""
self._proto.reset_to_root()
self._menu = Menu.ROOT
def _detect_menu(self) -> Menu:
"""Probe firmware prompt and return the corresponding Menu enum.
Caller must hold ``_lock``.
"""
prompt = self._proto._probe_prompt()
upper = prompt.upper()
# Check against known prompt strings (handles EE> vs EEPROM>, etc.)
for menu, prompt_str in self._MENU_PROMPTS.items():
if menu == Menu.UNKNOWN:
continue
if prompt_str.upper() in upper:
return menu
return Menu.UNKNOWN
def _ensure_menu(self, target: Menu) -> None:
"""Navigate to *target* submenu if not already there.
Caller must hold ``_lock``.
"""
if self._menu == target:
return
# Always go back to root first — we don't know how to go
# directly between arbitrary submenus.
if self._menu != Menu.ROOT:
self._go_to_root()
if target == Menu.ROOT:
return
cmd = _MENU_COMMANDS.get(target)
if cmd is None:
raise ValueError(f"No entry command for menu {target!r}")
self._send(cmd)
self._menu = target
# ------------------------------------------------------------------
# Connection
# ------------------------------------------------------------------
def connect(self, port: str, baudrate: int = 115200) -> None:
"""Open the RS-422 serial connection."""
with self._lock:
self._proto.connect(port, baudrate)
self._connected = True
self._menu = self._detect_menu()
def cancel_operation(self) -> None:
"""Signal any in-progress long-running operation to abort.
Safe to call from any thread. The cancel event is checked every
~2 seconds by ``send_with_timeout``.
"""
self._cancel.set()
def clear_cancel(self) -> None:
"""Reset the cancel event so future operations proceed normally."""
self._cancel.clear()
def disconnect(self) -> None:
"""Close the serial connection.
Signals cancellation first to unblock any long-running serial
reads (e.g. firmware sweep), then acquires the lock to close
the port cleanly.
"""
self._cancel.set()
if not self._lock.acquire(timeout=5):
# Lock held by dead/stuck worker — force-close the port
# so the blocked serial read raises an exception.
logger.warning("Lock acquisition timed out, force-closing port")
with contextlib.suppress(Exception):
self._proto.disconnect()
self._connected = False
self._menu = Menu.UNKNOWN
self._cancel.clear()
return
try:
with contextlib.suppress(Exception):
self._go_to_root()
self._proto.disconnect()
self._connected = False
self._menu = Menu.UNKNOWN
self._cancel.clear()
finally:
self._lock.release()
@property
def is_connected(self) -> bool:
return self._connected and self._proto.is_connected
def initialize(self, skip_init: bool = False) -> None:
"""Prepare the dish for motor commands.
Args:
skip_init: If True, skip the protocol initialize step
(useful when re-connecting to an already-running dish).
"""
with self._lock:
if not skip_init:
self._proto.initialize()
self._menu = Menu.MOT # initialize() ends in MOT>
# else: leave _menu as whatever connect() detected
# ------------------------------------------------------------------
# Motor (MOT>)
# ------------------------------------------------------------------
def get_position(self) -> dict[str, float]:
"""Query current AZ/EL position.
Returns:
``{"azimuth": float, "elevation": float}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
response = self._send("a")
az_m = re.search(r"Angle\[0\]\s*=\s*(-?\d+\.?\d*)", response)
el_m = re.search(r"Angle\[1\]\s*=\s*(-?\d+\.?\d*)", response)
if not az_m or not el_m:
raise ValueError(f"Could not parse position: {response!r}")
return {
"azimuth": float(az_m.group(1)),
"elevation": float(el_m.group(1)),
}
def move_to(self, az: float, el: float) -> None:
"""Move the dish to an absolute AZ/EL position."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"a 0 {az}")
self._send(f"a 1 {el}")
def move_motor(self, motor_id: int, degrees: float) -> None:
"""Move a single motor to an absolute position."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"a {motor_id} {degrees}")
def home_motor(self, motor_id: int) -> None:
"""Home a motor to its reference position."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"h {motor_id}")
def engage(self) -> None:
"""Engage (energize) the stepper motors."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send("e")
def release(self) -> None:
"""Release (de-energize) the stepper motors."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send("r")
def get_motor_list(self) -> str:
"""List motors and their state."""
with self._lock:
self._ensure_menu(Menu.MOT)
return self._send("l")
def get_motor_dynamics(self) -> dict[str, float]:
"""Read max velocity and acceleration for both axes.
Returns:
``{"az_max_vel": float, "el_max_vel": float,
"az_accel": float, "el_accel": float}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
mv_resp = self._send("mv")
ma_resp = self._send("ma")
result: dict[str, float] = {
"az_max_vel": 0.0,
"el_max_vel": 0.0,
"az_accel": 0.0,
"el_accel": 0.0,
}
# mv -> "Max Vel [0] = 65.0 Max Vel [1] = 45.0" or similar
vel_matches = re.findall(r"Max Vel\s*\[(\d)\]\s*=\s*(-?\d+\.?\d*)", mv_resp)
for motor_id, val in vel_matches:
if motor_id == "0":
result["az_max_vel"] = float(val)
elif motor_id == "1":
result["el_max_vel"] = float(val)
# ma -> "Accel[0] = 400.0 Accel[1] = 400.0"
acc_matches = re.findall(r"Accel\[(\d)\]\s*=\s*(-?\d+\.?\d*)", ma_resp)
for motor_id, val in acc_matches:
if motor_id == "0":
result["az_accel"] = float(val)
elif motor_id == "1":
result["el_accel"] = float(val)
return result
def set_max_velocity(self, motor_id: int, deg_per_sec: float) -> None:
"""Set max velocity for a motor axis (deg/s)."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"mv {motor_id} {deg_per_sec:.1f}")
def set_max_acceleration(self, motor_id: int, accel: float) -> None:
"""Set max acceleration for a motor axis.
Firmware: MOT> ma [motor] [accel] (deg/s^2).
"""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"ma {motor_id} {accel:.1f}")
def get_motor_life(self) -> str:
"""Read motor lifetime / usage statistics."""
with self._lock:
self._ensure_menu(Menu.MOT)
return self._send("life")
def get_el_limits(self) -> dict[str, float]:
"""Read elevation min, max, and home angles.
Firmware returns centidegrees: ``Min: 1800 Max: 6500 Home: 6500``
Returns:
``{"min": 18.0, "max": 65.0, "home": 65.0}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
response = self._send("elminmaxhome")
result: dict[str, float] = {"min": 0.0, "max": 0.0, "home": 0.0}
min_m = re.search(r"Min:\s*(\d+)", response)
max_m = re.search(r"Max:\s*(\d+)", response)
home_m = re.search(r"Home:\s*(\d+)", response)
if min_m:
result["min"] = int(min_m.group(1)) / 100.0
if max_m:
result["max"] = int(max_m.group(1)) / 100.0
if home_m:
result["home"] = int(home_m.group(1)) / 100.0
return result
def get_step_positions(self) -> dict[str, int]:
"""Read raw step positions for both axes.
Firmware returns: ``Position[0] = 19998 Position[1] = 3116``
Returns:
``{"az_steps": int, "el_steps": int}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
response = self._send("p")
result: dict[str, int] = {"az_steps": 0, "el_steps": 0}
matches = re.findall(r"Position\[(\d)\]\s*=\s*(-?\d+)", response)
for motor_id, val in matches:
if motor_id == "0":
result["az_steps"] = int(val)
elif motor_id == "1":
result["el_steps"] = int(val)
return result
def get_pid_gains(self) -> dict[str, dict[str, float]]:
"""Read PID gains for both motor axes.
Firmware returns: ``Kp=600 Kv=60 Ki=1`` per motor.
Returns:
``{"az": {"kp": 600, "kv": 60, "ki": 1},
"el": {"kp": 250, "kv": 50, "ki": 1}}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
response = self._send("pid")
# Parse "Kp=600 Kv=60 Ki=1" patterns. The pid command without args
# shows both motors. We look for two sets of Kp/Kv/Ki values.
kp_matches = re.findall(r"Kp[=:]?\s*(\d+)", response)
kv_matches = re.findall(r"Kv[=:]?\s*(\d+)", response)
ki_matches = re.findall(r"Ki[=:]?\s*(\d+)", response)
result = {
"az": {"kp": 600.0, "kv": 60.0, "ki": 1.0},
"el": {"kp": 250.0, "kv": 50.0, "ki": 1.0},
}
if len(kp_matches) >= 2:
result["az"]["kp"] = float(kp_matches[0])
result["el"]["kp"] = float(kp_matches[1])
elif len(kp_matches) == 1:
result["az"]["kp"] = float(kp_matches[0])
if len(kv_matches) >= 2:
result["az"]["kv"] = float(kv_matches[0])
result["el"]["kv"] = float(kv_matches[1])
elif len(kv_matches) == 1:
result["az"]["kv"] = float(kv_matches[0])
if len(ki_matches) >= 2:
result["az"]["ki"] = float(ki_matches[0])
result["el"]["ki"] = float(ki_matches[1])
elif len(ki_matches) == 1:
result["az"]["ki"] = float(ki_matches[0])
return result
def set_pid_gains(self, motor_id: int, kp: float, kv: float, ki: float) -> None:
"""Write PID gains for a single motor axis.
Args:
motor_id: 0 for AZ, 1 for EL.
kp: Proportional gain.
kv: Velocity gain.
ki: Integral gain.
"""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"pid {motor_id} {int(kp)} {int(kv)} {int(ki)}")
def az_sweep_firmware(
self,
start_az: float,
span: float,
step_cdeg: int,
num_xponders: int,
timeout: float = 120,
) -> list[dict[str, float]]:
"""Execute a firmware-accelerated AZ sweep via azscanwxp.
Moves to *start_az* first, then runs the firmware sweep command which
handles motor movement and RSSI measurement atomically — no per-point
serial round-trips.
Args:
start_az: Starting azimuth in degrees.
span: Total sweep width in degrees.
step_cdeg: Step size in centidegrees (100 = 1.00 deg).
num_xponders: Number of transponders to cycle per position.
timeout: Serial read timeout for the long-running command.
Returns:
List of dicts with keys: az, rssi, lock, snr.
"""
with self._lock:
self._ensure_menu(Menu.MOT)
# Move to start position and wait for prompt.
self._send(f"a 0 {start_az}")
# Execute firmware sweep with extended timeout.
# Pass cancel event so disconnect() can interrupt the read.
response = self._proto.send_with_timeout(
f"azscanwxp 0 {span} {step_cdeg} {num_xponders}",
timeout=timeout,
cancel=self._cancel,
)
# Parse streaming output lines.
# Motor:<id> Angle:<cdeg> RSSI:<adc> Lock:<0/1> SNR:<dB>
results: list[dict[str, float]] = []
for match in re.finditer(
r"Angle:(-?\d+)\s+RSSI:(\d+)\s+Lock:(\d)\s+SNR:(-?\d+\.?\d*)",
response,
):
results.append(
{
"az": int(match.group(1)) / 100.0,
"rssi": float(match.group(2)),
"lock": float(match.group(3)),
"snr": float(match.group(4)),
}
)
logger.info("Firmware sweep returned %d points", len(results))
return results
# ------------------------------------------------------------------
# Signal (DVB>)
# ------------------------------------------------------------------
def get_rssi(self, iterations: int = 10) -> dict[str, int]:
"""Read averaged RSSI signal strength.
Returns:
``{"reads": int, "average": int, "current": int}``
"""
with self._lock:
self._ensure_menu(Menu.DVB)
response = self._send(f"rssi {iterations}")
match = re.search(
r"Reads:(\d+)\s+RSSI\[avg:\s*(\d+)\s+cur:\s*(\d+)\]",
response,
)
if match:
return {
"reads": int(match.group(1)),
"average": int(match.group(2)),
"current": int(match.group(3)),
}
raise ValueError(f"Could not parse RSSI: {response!r}")
def enable_lna(self) -> None:
"""Enable LNA in ODU mode (13V). Alias for set_lnb_voltage('odu')."""
self.set_lnb_voltage("odu")
def set_lnb_voltage(self, mode: str) -> str:
"""Set LNB DC voltage mode.
Args:
mode: 'odu' for 13V (V-pol, LNA enabled) or 'stb' for 18V (H-pol).
Returns:
Raw firmware response.
"""
mode = mode.strip().lower()
if mode not in ("odu", "stb"):
raise ValueError(
f"Invalid LNB mode {mode!r}: use 'odu' (13V) or 'stb' (18V)"
)
with self._lock:
self._ensure_menu(Menu.DVB)
return self._send(f"lnbdc {mode}")
def get_lock_status(self) -> str:
"""Read quick lock status (single-shot)."""
with self._lock:
self._ensure_menu(Menu.DVB)
return self._send("qls")
def get_dvb_config(self) -> str:
"""Read BCM hardware/firmware version."""
with self._lock:
self._ensure_menu(Menu.DVB)
return self._send("config")
def get_channel_params(self) -> str:
"""Read current channel parameters."""
with self._lock:
self._ensure_menu(Menu.DVB)
return self._send("dis")
# ------------------------------------------------------------------
# A3981
# ------------------------------------------------------------------
def get_a3981_diag(self) -> str:
"""Read A3981 diagnostic/fault status."""
with self._lock:
self._ensure_menu(Menu.A3981)
return self._send("diag")
def get_a3981_modes(self) -> dict[str, str]:
"""Read A3981 step mode, current mode, and step size.
Returns:
``{"step_mode": str, "current_mode": str, "step_size": str}``
"""
with self._lock:
self._ensure_menu(Menu.A3981)
sm_resp = self._send("sm")
cm_resp = self._send("cm")
ss_resp = self._send("ss")
return {
"step_mode": sm_resp,
"current_mode": cm_resp,
"step_size": ss_resp,
}
def get_a3981_torque(self) -> str:
"""Read A3981 torque levels."""
with self._lock:
self._ensure_menu(Menu.A3981)
return self._send("st")
# ------------------------------------------------------------------
# NVS
# ------------------------------------------------------------------
def nvs_dump(self) -> str:
"""Dump all NVS values."""
with self._lock:
self._ensure_menu(Menu.NVS)
return self._send("d")
def nvs_read(self, index: int) -> str:
"""Read a single NVS value by index."""
with self._lock:
self._ensure_menu(Menu.NVS)
return self._send(f"e {index}")
# ------------------------------------------------------------------
# ADC
# ------------------------------------------------------------------
def get_adc_rssi(self) -> str:
"""Read single-shot ADC RSSI value."""
with self._lock:
self._ensure_menu(Menu.ADC)
return self._send("rssi")
def get_board_id(self) -> str:
"""Read board identification string."""
with self._lock:
self._ensure_menu(Menu.ADC)
return self._send("bdid")
# ------------------------------------------------------------------
# OS
# ------------------------------------------------------------------
def get_firmware_id(self) -> str:
"""Read full MCU and firmware identification."""
with self._lock:
self._ensure_menu(Menu.OS)
return self._send("id")
# ------------------------------------------------------------------
# Raw / Console
# ------------------------------------------------------------------
def send_raw(self, cmd: str) -> str:
"""Send an arbitrary command and return the raw response.
After a raw command, the menu state is marked UNKNOWN because
the user may have navigated to a different submenu.
"""
with self._lock:
response = self._send(cmd)
self._menu = Menu.UNKNOWN
return response