diff --git a/src/birdcage/protocol.py b/src/birdcage/protocol.py index dc7bbc7..c03885e 100644 --- a/src/birdcage/protocol.py +++ b/src/birdcage/protocol.py @@ -392,6 +392,10 @@ class CarryoutG2Protocol(FirmwareProtocol): raise ValueError(f"Could not parse RSSI from: {response!r}") + def send_raw(self, cmd: str) -> str: + """Send arbitrary command, return raw prompt-terminated response.""" + return self._send(cmd) + def quit_submenu(self) -> None: """Exit current submenu and return to parent.""" self._send("q") diff --git a/tui/pyproject.toml b/tui/pyproject.toml new file mode 100644 index 0000000..85f6d7e --- /dev/null +++ b/tui/pyproject.toml @@ -0,0 +1,31 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "birdcage-tui" +version = "2026.02.13" +description = "Textual TUI for Winegard Carryout G2 satellite dish control" +license = "MIT" +requires-python = ">=3.11" +authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}] +dependencies = [ + "birdcage", + "textual>=1.0.0", +] + +[project.scripts] +birdcage-tui = "birdcage_tui.app:main" + +[tool.uv.sources] +birdcage = { path = ".." } + +[tool.ruff] +target-version = "py311" +src = ["src"] + +[tool.ruff.lint] +select = ["E", "F", "I", "UP", "B", "SIM"] + +[tool.hatch.build.targets.wheel] +packages = ["src/birdcage_tui"] diff --git a/tui/src/birdcage_tui/__init__.py b/tui/src/birdcage_tui/__init__.py new file mode 100644 index 0000000..d5dfabc --- /dev/null +++ b/tui/src/birdcage_tui/__init__.py @@ -0,0 +1 @@ +"""Birdcage TUI — Textual interface for Winegard satellite dish control.""" diff --git a/tui/src/birdcage_tui/app.py b/tui/src/birdcage_tui/app.py new file mode 100644 index 0000000..1349e63 --- /dev/null +++ b/tui/src/birdcage_tui/app.py @@ -0,0 +1,165 @@ +"""Birdcage TUI — main application shell. + +ContentSwitcher-based layout with sidebar navigation (F1-F5), +device status bar, and five swappable screen panels. +""" + +import argparse +import logging + +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Horizontal, Vertical +from textual.widgets import Button, ContentSwitcher, Footer, Header, Static + +from birdcage_tui.screens.console import ConsoleScreen +from birdcage_tui.screens.position import PositionScreen +from birdcage_tui.screens.scan import ScanScreen +from birdcage_tui.screens.signal import SignalScreen +from birdcage_tui.screens.system import SystemScreen +from birdcage_tui.widgets.device_status_bar import DeviceStatusBar + +log = logging.getLogger(__name__) + +MODES: dict[str, tuple[str, type]] = { + "position": ("F1 Position", PositionScreen), + "signal": ("F2 Signal", SignalScreen), + "scan": ("F3 Scan", ScanScreen), + "system": ("F4 System", SystemScreen), + "console": ("F5 Console", ConsoleScreen), +} + + +class BirdcageApp(App): + """Textual application for Winegard satellite dish control.""" + + TITLE = "Birdcage" + CSS_PATH = "theme.tcss" + + BINDINGS = [ + Binding("f1", "switch_mode('position')", "Position"), + Binding("f2", "switch_mode('signal')", "Signal"), + Binding("f3", "switch_mode('scan')", "Scan"), + Binding("f4", "switch_mode('system')", "System"), + Binding("f5", "switch_mode('console')", "Console"), + Binding("q", "quit", "Quit"), + Binding("d", "toggle_dark", "Dark"), + ] + + # Set from CLI args before run() + demo_mode: bool = False + serial_port: str = "/dev/ttyUSB0" + firmware_name: str = "g2" + skip_init: bool = False + device: object = None + + @property + def SUB_TITLE(self) -> str: # noqa: N802 + if self.demo_mode: + return "DEMO" + return self.serial_port + + def compose(self) -> ComposeResult: + yield Header() + with Horizontal(id="main-area"): + with Vertical(id="sidebar"): + yield Static("\U0001f6f0\ufe0f Birdcage", classes="sidebar-title") + yield Static("Carryout G2", classes="sidebar-subtitle") + for mode_key, (label, _) in MODES.items(): + yield Button(label, id=f"btn-{mode_key}", classes="sidebar-btn") + yield DeviceStatusBar(id="device-status") + with ContentSwitcher(id="content-area", initial="position"): + for mode_key, (_, screen_cls) in MODES.items(): + yield screen_cls(id=mode_key) + yield Footer() + + def on_mount(self) -> None: + self.query_one("#btn-position").add_class("active") + self._setup_device() + + def _setup_device(self) -> None: + """Create device (demo or real) and hand it to each screen.""" + if self.demo_mode: + from birdcage_tui.demo import DemoDevice + + self.device = DemoDevice() + self.device.connect() + else: + from birdcage.protocol import get_protocol + + from birdcage_tui.bridge import SerialBridge + + protocol = get_protocol(self.firmware_name) + self.device = SerialBridge(protocol) + self.device.connect(self.serial_port) + if not self.skip_init: + self.run_worker(self._initialize_device, thread=True) + + self._distribute_device() + + async def _initialize_device(self) -> None: + """Run device init in a worker thread (blocks on serial I/O).""" + try: + self.device.initialize() + except Exception: + log.exception("Device initialization failed") + self.notify("Init failed -- check serial connection", severity="error") + + def _distribute_device(self) -> None: + """Pass the device reference to every screen that wants it.""" + for mode_key in MODES: + screen = self.query_one(f"#{mode_key}") + if hasattr(screen, "set_device"): + screen.set_device(self.device) + + status_bar = self.query_one("#device-status", DeviceStatusBar) + if hasattr(status_bar, "set_device"): + status_bar.set_device(self.device) + + def action_switch_mode(self, mode: str) -> None: + """Switch the content area to *mode* and update sidebar highlight.""" + switcher = self.query_one("#content-area", ContentSwitcher) + switcher.current = mode + + for btn in self.query(".sidebar-btn"): + btn.remove_class("active") + self.query_one(f"#btn-{mode}").add_class("active") + + screen = self.query_one(f"#{mode}") + if hasattr(screen, "on_show"): + screen.on_show() + + def action_toggle_dark(self) -> None: + self.dark = not self.dark + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + if button_id.startswith("btn-"): + mode = button_id.removeprefix("btn-") + if mode in MODES: + self.action_switch_mode(mode) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Birdcage TUI -- Satellite Dish Control" + ) + parser.add_argument("--demo", action="store_true", help="Run with simulated device") + parser.add_argument("--port", default="/dev/ttyUSB0", help="Serial port") + parser.add_argument( + "--firmware", + default="g2", + choices=["g2", "hal205", "hal000"], + help="Firmware version", + ) + parser.add_argument( + "--skip-init", action="store_true", help="Skip firmware initialization" + ) + args = parser.parse_args() + + app = BirdcageApp() + app.demo_mode = args.demo + app.serial_port = args.port + app.firmware_name = args.firmware + app.skip_init = args.skip_init + app.run() diff --git a/tui/src/birdcage_tui/bridge.py b/tui/src/birdcage_tui/bridge.py new file mode 100644 index 0000000..a90e218 --- /dev/null +++ b/tui/src/birdcage_tui/bridge.py @@ -0,0 +1,439 @@ +"""Thread-safe bridge between Birdcage TUI and CarryoutG2Protocol. + +Wraps all serial I/O in a threading.Lock so the TUI's worker threads +don't stomp on each other. Tracks the current firmware submenu to +minimize unnecessary q-then-reenter transitions. +""" + +import contextlib +import logging +import re +import threading +from enum import Enum, auto + +from birdcage.protocol import CarryoutG2Protocol + +logger = logging.getLogger(__name__) + + +class Menu(Enum): + """Firmware submenu states.""" + + ROOT = auto() + MOT = auto() + DVB = auto() + NVS = auto() + A3981 = auto() + ADC = auto() + OS = auto() + STEP = auto() + PEAK = auto() + EEPROM = auto() + GPIO = auto() + LATLON = auto() + DIPSWITCH = auto() + UNKNOWN = auto() + + +# Map Menu enum to the command that enters it from root. +_MENU_COMMANDS: dict[Menu, str] = { + Menu.MOT: "mot", + Menu.DVB: "dvb", + Menu.NVS: "nvs", + Menu.A3981: "a3981", + Menu.ADC: "adc", + Menu.OS: "os", + Menu.STEP: "step", + Menu.PEAK: "peak", + Menu.EEPROM: "eeprom", + Menu.GPIO: "gpio", + Menu.LATLON: "latlon", + Menu.DIPSWITCH: "dipswitch", +} + + +class SerialBridge: + """Thread-safe wrapper around CarryoutG2Protocol for TUI consumption. + + All public methods acquire a lock before touching the serial port. + The bridge tracks the current firmware submenu so it can skip + redundant quit-and-reenter cycles. + """ + + def __init__(self, protocol: CarryoutG2Protocol) -> None: + self._proto = protocol + self._lock = threading.Lock() + self._menu = Menu.UNKNOWN + self._connected = False + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _send(self, cmd: str) -> str: + """Send a command via the protocol's prompt-terminated send. + + Caller must hold ``_lock``. + """ + return self._proto.send_raw(cmd) + + def _go_to_root(self) -> None: + """Return to TRK> root menu. Caller must hold ``_lock``.""" + self._proto.reset_to_root() + self._menu = Menu.ROOT + + def _ensure_menu(self, target: Menu) -> None: + """Navigate to *target* submenu if not already there. + + Caller must hold ``_lock``. + """ + if self._menu == target: + return + + # Always go back to root first — we don't know how to go + # directly between arbitrary submenus. + if self._menu != Menu.ROOT: + self._go_to_root() + + if target == Menu.ROOT: + return + + cmd = _MENU_COMMANDS.get(target) + if cmd is None: + raise ValueError(f"No entry command for menu {target!r}") + + self._send(cmd) + self._menu = target + + # ------------------------------------------------------------------ + # Connection + # ------------------------------------------------------------------ + + def connect(self, port: str, baudrate: int = 115200) -> None: + """Open the RS-422 serial connection.""" + with self._lock: + self._proto.connect(port, baudrate) + self._connected = True + self._menu = Menu.UNKNOWN + + def disconnect(self) -> None: + """Close the serial connection.""" + with self._lock: + with contextlib.suppress(Exception): + self._go_to_root() + self._proto.disconnect() + self._connected = False + self._menu = Menu.UNKNOWN + + @property + def is_connected(self) -> bool: + return self._connected and self._proto.is_connected + + def initialize(self, skip_init: bool = False) -> None: + """Prepare the dish for motor commands. + + Args: + skip_init: If True, skip the protocol initialize step + (useful when re-connecting to an already-running dish). + """ + with self._lock: + if not skip_init: + self._proto.initialize() + self._menu = Menu.MOT # initialize() ends in MOT> + + # ------------------------------------------------------------------ + # Motor (MOT>) + # ------------------------------------------------------------------ + + def get_position(self) -> dict[str, float]: + """Query current AZ/EL position. + + Returns: + ``{"azimuth": float, "elevation": float}`` + """ + with self._lock: + self._ensure_menu(Menu.MOT) + response = self._send("a") + + az_m = re.search(r"Angle\[0\]\s*=\s*(-?\d+\.?\d*)", response) + el_m = re.search(r"Angle\[1\]\s*=\s*(-?\d+\.?\d*)", response) + + if not az_m or not el_m: + raise ValueError(f"Could not parse position: {response!r}") + + return { + "azimuth": float(az_m.group(1)), + "elevation": float(el_m.group(1)), + } + + def move_to(self, az: float, el: float) -> None: + """Move the dish to an absolute AZ/EL position.""" + with self._lock: + self._ensure_menu(Menu.MOT) + self._send(f"a 0 {az}") + self._send(f"a 1 {el}") + + def move_motor(self, motor_id: int, degrees: float) -> None: + """Move a single motor to an absolute position.""" + with self._lock: + self._ensure_menu(Menu.MOT) + self._send(f"a {motor_id} {degrees}") + + def home_motor(self, motor_id: int) -> None: + """Home a motor to its reference position.""" + with self._lock: + self._ensure_menu(Menu.MOT) + self._send(f"h {motor_id}") + + def engage(self) -> None: + """Engage (energize) the stepper motors.""" + with self._lock: + self._ensure_menu(Menu.MOT) + self._send("e") + + def release(self) -> None: + """Release (de-energize) the stepper motors.""" + with self._lock: + self._ensure_menu(Menu.MOT) + self._send("r") + + def get_motor_list(self) -> str: + """List motors and their state.""" + with self._lock: + self._ensure_menu(Menu.MOT) + return self._send("l") + + def get_motor_dynamics(self) -> dict[str, float]: + """Read max velocity and acceleration for both axes. + + Returns: + ``{"az_max_vel": float, "el_max_vel": float, + "az_accel": float, "el_accel": float}`` + """ + with self._lock: + self._ensure_menu(Menu.MOT) + mv_resp = self._send("mv") + ma_resp = self._send("ma") + + result: dict[str, float] = { + "az_max_vel": 0.0, + "el_max_vel": 0.0, + "az_accel": 0.0, + "el_accel": 0.0, + } + + # mv → "Max Vel [0] = 65.0 Max Vel [1] = 45.0" or similar + vel_matches = re.findall(r"Max Vel\s*\[(\d)\]\s*=\s*(-?\d+\.?\d*)", mv_resp) + for motor_id, val in vel_matches: + if motor_id == "0": + result["az_max_vel"] = float(val) + elif motor_id == "1": + result["el_max_vel"] = float(val) + + # ma → "Accel[0] = 400.0 Accel[1] = 400.0" + acc_matches = re.findall(r"Accel\[(\d)\]\s*=\s*(-?\d+\.?\d*)", ma_resp) + for motor_id, val in acc_matches: + if motor_id == "0": + result["az_accel"] = float(val) + elif motor_id == "1": + result["el_accel"] = float(val) + + return result + + def get_motor_life(self) -> str: + """Read motor lifetime / usage statistics.""" + with self._lock: + self._ensure_menu(Menu.MOT) + return self._send("life") + + def get_el_limits(self) -> dict[str, float]: + """Read elevation min, max, and home angles. + + Firmware returns centidegrees: ``Min: 1800 Max: 6500 Home: 6500`` + + Returns: + ``{"min": 18.0, "max": 65.0, "home": 65.0}`` + """ + with self._lock: + self._ensure_menu(Menu.MOT) + response = self._send("elminmaxhome") + + result: dict[str, float] = {"min": 0.0, "max": 0.0, "home": 0.0} + + min_m = re.search(r"Min:\s*(\d+)", response) + max_m = re.search(r"Max:\s*(\d+)", response) + home_m = re.search(r"Home:\s*(\d+)", response) + + if min_m: + result["min"] = int(min_m.group(1)) / 100.0 + if max_m: + result["max"] = int(max_m.group(1)) / 100.0 + if home_m: + result["home"] = int(home_m.group(1)) / 100.0 + + return result + + def get_step_positions(self) -> dict[str, int]: + """Read raw step positions for both axes. + + Firmware returns: ``Position[0] = 19998 Position[1] = 3116`` + + Returns: + ``{"az_steps": int, "el_steps": int}`` + """ + with self._lock: + self._ensure_menu(Menu.MOT) + response = self._send("p") + + result: dict[str, int] = {"az_steps": 0, "el_steps": 0} + + matches = re.findall(r"Position\[(\d)\]\s*=\s*(-?\d+)", response) + for motor_id, val in matches: + if motor_id == "0": + result["az_steps"] = int(val) + elif motor_id == "1": + result["el_steps"] = int(val) + + return result + + # ------------------------------------------------------------------ + # Signal (DVB>) + # ------------------------------------------------------------------ + + def get_rssi(self, iterations: int = 10) -> dict[str, int]: + """Read averaged RSSI signal strength. + + Returns: + ``{"reads": int, "average": int, "current": int}`` + """ + with self._lock: + self._ensure_menu(Menu.DVB) + response = self._send(f"rssi {iterations}") + + match = re.search( + r"Reads:(\d+)\s+RSSI\[avg:\s*(\d+)\s+cur:\s*(\d+)\]", + response, + ) + if match: + return { + "reads": int(match.group(1)), + "average": int(match.group(2)), + "current": int(match.group(3)), + } + + raise ValueError(f"Could not parse RSSI: {response!r}") + + def enable_lna(self) -> None: + """Enable LNA in ODU mode (sets LNB to 13V).""" + with self._lock: + self._ensure_menu(Menu.DVB) + self._send("lnbdc odu") + + def get_lock_status(self) -> str: + """Read quick lock status (single-shot).""" + with self._lock: + self._ensure_menu(Menu.DVB) + return self._send("qls") + + def get_dvb_config(self) -> str: + """Read BCM hardware/firmware version.""" + with self._lock: + self._ensure_menu(Menu.DVB) + return self._send("config") + + def get_channel_params(self) -> str: + """Read current channel parameters.""" + with self._lock: + self._ensure_menu(Menu.DVB) + return self._send("dis") + + # ------------------------------------------------------------------ + # A3981 + # ------------------------------------------------------------------ + + def get_a3981_diag(self) -> str: + """Read A3981 diagnostic/fault status.""" + with self._lock: + self._ensure_menu(Menu.A3981) + return self._send("diag") + + def get_a3981_modes(self) -> dict[str, str]: + """Read A3981 step mode, current mode, and step size. + + Returns: + ``{"step_mode": str, "current_mode": str, "step_size": str}`` + """ + with self._lock: + self._ensure_menu(Menu.A3981) + sm_resp = self._send("sm") + cm_resp = self._send("cm") + ss_resp = self._send("ss") + + return { + "step_mode": sm_resp, + "current_mode": cm_resp, + "step_size": ss_resp, + } + + def get_a3981_torque(self) -> str: + """Read A3981 torque levels.""" + with self._lock: + self._ensure_menu(Menu.A3981) + return self._send("st") + + # ------------------------------------------------------------------ + # NVS + # ------------------------------------------------------------------ + + def nvs_dump(self) -> str: + """Dump all NVS values.""" + with self._lock: + self._ensure_menu(Menu.NVS) + return self._send("d") + + def nvs_read(self, index: int) -> str: + """Read a single NVS value by index.""" + with self._lock: + self._ensure_menu(Menu.NVS) + return self._send(f"e {index}") + + # ------------------------------------------------------------------ + # ADC + # ------------------------------------------------------------------ + + def get_adc_rssi(self) -> str: + """Read single-shot ADC RSSI value.""" + with self._lock: + self._ensure_menu(Menu.ADC) + return self._send("rssi") + + def get_board_id(self) -> str: + """Read board identification string.""" + with self._lock: + self._ensure_menu(Menu.ADC) + return self._send("bdid") + + # ------------------------------------------------------------------ + # OS + # ------------------------------------------------------------------ + + def get_firmware_id(self) -> str: + """Read full MCU and firmware identification.""" + with self._lock: + self._ensure_menu(Menu.OS) + return self._send("id") + + # ------------------------------------------------------------------ + # Raw / Console + # ------------------------------------------------------------------ + + def send_raw(self, cmd: str) -> str: + """Send an arbitrary command and return the raw response. + + After a raw command, the menu state is marked UNKNOWN because + the user may have navigated to a different submenu. + """ + with self._lock: + response = self._send(cmd) + self._menu = Menu.UNKNOWN + return response diff --git a/tui/src/birdcage_tui/demo.py b/tui/src/birdcage_tui/demo.py new file mode 100644 index 0000000..e86ab88 --- /dev/null +++ b/tui/src/birdcage_tui/demo.py @@ -0,0 +1,631 @@ +"""Synthetic demo device for the Birdcage TUI. + +Drop-in replacement for SerialBridge that simulates a Winegard Carryout G2 +dish with motor movement, RSSI signal modeling, and canned firmware responses. +No serial hardware required. +""" + +import contextlib +import math +import random +import time +from enum import Enum, auto + + +class _DemoMenu(Enum): + """Simulated firmware submenu states.""" + + ROOT = auto() + MOT = auto() + DVB = auto() + NVS = auto() + A3981 = auto() + ADC = auto() + OS = auto() + STEP = auto() + PEAK = auto() + EEPROM = auto() + GPIO = auto() + LATLON = auto() + DIPSWITCH = auto() + + +# Complete NVS dump text from firmware 02.02.48 (captured 2026-02-12). +_NVS_DUMP_TEXT = """\ +Num Name Current Saved Default +---- -------------------------- ---------- ---------- ---------- + 0) Log ID's 0x00000007 0x00000007 0x00000007 + 1) Log Device 0x00000001 0x00000001 0x00000001 + 2) Debug 2nd Console Port 0 0 0 + 3) Debug 2nd Packet Port 0 0 0 + 4) Debug Port Connection 0 0 0 + 16) Pitch Deadband 0.00 0.00 0.00 + 17) Roll Deadband 0.00 0.00 0.00 + 18) Yaw Deadband 0.00 0.00 0.00 + 20) Disable Tracker Proc? TRUE TRUE FALSE + 21) Tracker Proc Run Mode 0 0 0 + 22) Conical Alpha Az 200 200 200 + 23) Conical Alpha El 200 200 200 + 24) Conical Radius 1.00 1.00 1.00 + 25) Conical Count Max 20 20 20 + 26) Conical Test Drift +0 +0 +0 + 27) Circle RPM 120 120 120 + 28) Circle Pts/Rev 6 6 6 + 32) Conical Az Clamp 8.00 8.00 8.00 + 33) Conical El Clamp 8.00 8.00 8.00 + 35) Motor Pts/Rev 72 72 72 + 36) Circle Az Radius 1.00 1.00 1.00 + 37) Circle El Radius 1.00 1.00 1.00 + 38) Sleep Mode Timer Secs 420 420 420 + 40) Motor Type 0 0 0 + 41) Satellite Scan Velocity 55.00 55.00 55.00 + 48) Motor Spiral Velocity 55.00 55.00 55.00 + 49) Motor Gear Ratio 0x00000000 0x00000000 0x00000000 + 63) GPS Heading Threshold 1.00 1.00 1.00 + 64) GPS Moving Threshold 5.00 MPH 5.00 MPH 5.00 MPH + 66) Spiral Signal In A Row Min +3 +3 +3 + 67) Spiral Signal In A Row Max +20 +20 +20 + 68) Signal Odd to Even Offset +0 +0 +0 + 69) Signal Offset 80 80 80 + 70) Signal Baseline Angle 65.00 65.00 65.00 + 71) Signal Re-Peak Degrade Percent 25 25 25 + 72) Gyro Sensitivity +1110 +1110 +1110 + 73) Gyro Filter Size +1 +1 +1 + 74) Gyro Calib Readings 100 100 100 + 75) Gyro Mount Type 1 1 1 + 76) Gyro Velocity Offset 4 4 4 + 77) Gyro Max Accel 600 600 600 + 80) AZ Max Vel 65.00 65.00 65.00 + 81) AZ Max Accel 400.00 400.00 400.00 + 82) AZ Home Velocity 55.00 55.00 55.00 + 83) AZ Steps/Rev 40000 40000 40000 + 84) AZ Direction +1 +1 +1 + 85) EL Max Vel 45.00 45.00 45.00 + 86) EL Max Accel 400.00 400.00 400.00 + 87) EL Home Velocity 45.00 45.00 45.00 + 88) EL Steps/Rev 24960 24960 24960 + 89) EL Direction +1 +1 +1 + 95) AZ Low current limit 0x0000ff0c 0x0000ff0c 0x0000ff0c + 96) AZ High current limit 0x0000ff30 0x0000ff30 0x0000ff30 + 97) EL Low current limit 0x0000ff0c 0x0000ff0c 0x0000ff0c + 98) EL High current limit 0x0000ff40 0x0000ff40 0x0000ff40 +101) Minimum Elevation Angle 18.00 18.00 18.00 +102) Maximum Elevation Angle 65.00 65.00 65.00 +103) Elevation Home Angle 65.00 65.00 65.00 +106) Az Stall Detect 78 78 78 +107) El Stall Detect 75 75 75 +108) Az Stall Samples 100 100 100 +109) El Stall Samples 100 100 100 +110) EL Home Current Limit 0x0000ff28 0x0000ff28 0x0000ff28 +111) AZ Home Current Limit 0x0000ff40 0x0000ff40 0x0000ff40 +112) Disable Dipswitch? FALSE FALSE FALSE +113) Dipswitch Value 101 101 101 +114) Dipswitch Front/Rear Mount 0 0 0 +115) Mount Offset Angle +0 +0 +0 +118) Signal Use LNB Clamp FALSE FALSE FALSE +128) AZ PID Kp +600 +600 +600 +129) AZ PID Kv +60 +60 +60 +130) AZ PID Ki +1 +1 +1 +131) EL PID Kp +250 +250 +250 +132) EL PID Kv +50 +50 +50 +133) EL PID Ki +1 +1 +1 +136) AZ PWM Stall Cnt 6 6 6 +137) EL PWM Stall Cnt 5 5 5 +143) Tracking Number 0 0 0""" + +# Parse NVS lines into a dict keyed by index for nvs_read(). +_NVS_LINES: dict[int, str] = {} +for _line in _NVS_DUMP_TEXT.splitlines(): + _line_stripped = _line.strip() + if _line_stripped and _line_stripped[0].isdigit(): + _idx_str = _line_stripped.split(")")[0].strip() + with contextlib.suppress(ValueError): + _NVS_LINES[int(_idx_str)] = _line_stripped + +# Firmware identification text matching ``os > id`` output. +_FIRMWARE_ID = """\ +NVS Version: 1.02.13 +System ID: TWELINCH + K60-144pin + Silicon Rev 2.4 + Mask Set 4N22D + 512 kBytes of P-flash + P-flash only + 128 kBytes of RAM + Board Rev ID: A + Board ID: STATIONARY + Ant ID: 12-IN G2 + Software version: 02.02.48 + CCLK: 96000000 + BCLK: 48000000 + Flash Base Address: 65536 + Flash Size: 458752""" + +_DVB_CONFIG = """\ +BCM Hardware= ID: 0x4515 VER: 0xB0 +BCM Firmware= MAJOR VER: 0x71 (113) MINOR VER: 0x25 (37) +BCM Strap Config: 0x25018""" + +_CHANNEL_PARAMS = """\ +Power Mode: ON +Search Transponders: ON +Auto Search Mode: 1 +Shuffle Mode: ON +Frequency List: Non-Stacked + +Num Parameter Current Default +1 Frequency 1090640 (kHz) 974000 (kHz) +2 Symbol Rate 0 (PeakScanEnabled) 20000 (ksps) +3 Trans_Mod_CRate blind_scan blind_scan +4 Blind Scan Mode ___trb_dvb_dss_____ ___trb_dvb_dss_____ +5 LNB Polarity ODU:13V --- +6 LNB Tone (ODU) off off +7 Roll-off 0.35 0.35 +8 LPF Cutoff 0 (auto) 0 (MHz) +9 Carrier Offset 0 (kHz) 0 (kHz) +10 FreqSearchRange 5000 (kHz) 5000 (kHz) +11 DCII Mode dcii_qpsk_comb dcii_qpsk_comb +12 Spectral Inv scan scan +13 PScnSymRtRngMin 18000 (ksps) 18000 (ksps) +14 PScnSymRtRngMax 24000 (ksps) 24000 (ksps) +15 SignalDetectMode off off""" + +_MOTOR_LIFE = """\ +AZ total moves: 847 +AZ total degrees: 52340.50 +EL total moves: 423 +EL total degrees: 18920.75 +Uptime hours: 312.4""" + +# Simulated satellite at AZ=200, EL=38 for RSSI modeling. +_SAT_AZ = 200.0 +_SAT_EL = 38.0 +_RSSI_NOISE_FLOOR = 500 +_RSSI_PEAK = 2000 +_RSSI_BEAM_WIDTH = 50.0 # Gaussian denominator (degrees squared) + +# Motor simulation speed (degrees per second). +_MOTOR_SPEED = 10.0 + + +class DemoDevice: + """Synthetic demo device implementing the same interface as SerialBridge. + + Simulates a Carryout G2 dish with motor movement, RSSI signal modeling, + and canned firmware responses. No serial hardware required. + """ + + def __init__(self) -> None: + self._connected = False + self._engaged = True + + # Current position and movement targets. + self._az = 180.0 + self._el = 45.0 + self._target_az = 180.0 + self._target_el = 45.0 + self._last_move_time = time.monotonic() + + # Submenu tracking for console simulation. + self._menu = _DemoMenu.ROOT + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _update_position(self) -> None: + """Interpolate position toward target at ~10 deg/s.""" + now = time.monotonic() + dt = now - self._last_move_time + self._last_move_time = now + + max_step = _MOTOR_SPEED * dt + + for axis in ("az", "el"): + current = getattr(self, f"_{axis}") + target = getattr(self, f"_target_{axis}") + delta = target - current + + if abs(delta) < 0.001: + continue + + if abs(delta) <= max_step: + # Arrived — add a tiny settling noise. + noise = random.gauss(0.0, 0.02) + setattr(self, f"_{axis}", target + noise) + else: + direction = 1.0 if delta > 0 else -1.0 + noise = random.gauss(0.0, 0.02) + setattr(self, f"_{axis}", current + direction * max_step + noise) + + def _compute_rssi(self) -> float: + """Gaussian signal model centered on the simulated satellite.""" + self._update_position() + dist_sq = (self._az - _SAT_AZ) ** 2 + (self._el - _SAT_EL) ** 2 + signal = _RSSI_PEAK * math.exp(-dist_sq / _RSSI_BEAM_WIDTH) + drift = math.sin(time.monotonic() / 60.0) * 50.0 + return _RSSI_NOISE_FLOOR + signal + drift + + @property + def _is_moving(self) -> bool: + return ( + abs(self._az - self._target_az) > 0.05 + or abs(self._el - self._target_el) > 0.05 + ) + + # ------------------------------------------------------------------ + # Connection + # ------------------------------------------------------------------ + + def connect(self, port: str = "/dev/demo", baudrate: int = 115200) -> None: + self._connected = True + self._menu = _DemoMenu.ROOT + + def disconnect(self) -> None: + self._connected = False + self._menu = _DemoMenu.ROOT + + @property + def is_connected(self) -> bool: + return self._connected + + def initialize(self, skip_init: bool = False) -> None: + self._connected = True + self._menu = _DemoMenu.MOT + + # ------------------------------------------------------------------ + # Motor (MOT>) + # ------------------------------------------------------------------ + + def get_position(self) -> dict[str, float]: + self._update_position() + return { + "azimuth": round(self._az, 2), + "elevation": round(self._el, 2), + } + + def move_to(self, az: float, el: float) -> None: + self._target_az = az + self._target_el = el + self._last_move_time = time.monotonic() + + def move_motor(self, motor_id: int, degrees: float) -> None: + if motor_id == 0: + self._target_az = degrees + elif motor_id == 1: + self._target_el = degrees + self._last_move_time = time.monotonic() + + def home_motor(self, motor_id: int) -> None: + if motor_id == 0: + self._target_az = 0.0 + elif motor_id == 1: + self._target_el = 65.0 + self._last_move_time = time.monotonic() + + def engage(self) -> None: + self._engaged = True + + def release(self) -> None: + self._engaged = False + + def get_motor_list(self) -> str: + return "Motors:\n 0 - AZIMUTH: local\n 1 - ELEVATION: local" + + def get_motor_dynamics(self) -> dict[str, float]: + return { + "az_max_vel": 65.0, + "el_max_vel": 45.0, + "az_accel": 400.0, + "el_accel": 400.0, + } + + def get_motor_life(self) -> str: + return _MOTOR_LIFE + + def get_el_limits(self) -> dict[str, float]: + return {"min": 18.0, "max": 65.0, "home": 65.0} + + def get_step_positions(self) -> dict[str, int]: + self._update_position() + return { + "az_steps": int(self._az * 40000 / 360), + "el_steps": int(self._el * 24960 / 360), + } + + # ------------------------------------------------------------------ + # Signal (DVB>) + # ------------------------------------------------------------------ + + def get_rssi(self, iterations: int = 10) -> dict[str, int]: + rssi = self._compute_rssi() + noise = random.gauss(0.0, 30.0) + return { + "reads": iterations, + "average": int(rssi), + "current": int(rssi + noise), + } + + def enable_lna(self) -> None: + pass # No-op in demo mode. + + def get_lock_status(self) -> str: + rssi = int(self._compute_rssi()) + locked = 1 if rssi > 1500 else 0 + return f"Lock:{locked} rssi:{rssi} cnt:0" + + def get_dvb_config(self) -> str: + return _DVB_CONFIG + + def get_channel_params(self) -> str: + return _CHANNEL_PARAMS + + # ------------------------------------------------------------------ + # A3981 + # ------------------------------------------------------------------ + + def get_a3981_diag(self) -> str: + return "AZ DIAG: OK\nEL DIAG: OK" + + def get_a3981_modes(self) -> dict[str, str]: + return { + "step_mode": "AZ Step Size Mode = AUTO\nEL Step Size Mode = AUTO", + "current_mode": "AZ: Mode = AUTO\nEL: Mode = AUTO", + "step_size": ( + "KEY: FULL-16, HALF-8, QTR-4, EIGHTH-2, SIXTEENTH-1\n" + "AZ Step Size:1\n" + "EL Step Size:1" + ), + } + + def get_a3981_torque(self) -> str: + if self._is_moving: + return "AZ Torq:HIGH\nEL Torq:HIGH" + return "AZ Torq:LOW\nEL Torq:LOW" + + # ------------------------------------------------------------------ + # NVS + # ------------------------------------------------------------------ + + def nvs_dump(self) -> str: + return _NVS_DUMP_TEXT + + def nvs_read(self, index: int) -> str: + line = _NVS_LINES.get(index) + if line: + return line + return f"NVS index {index} not found" + + # ------------------------------------------------------------------ + # ADC + # ------------------------------------------------------------------ + + def get_adc_rssi(self) -> str: + rssi = self._compute_rssi() + return str(int(rssi)) + + def get_board_id(self) -> str: + return "STATIONARY" + + # ------------------------------------------------------------------ + # OS + # ------------------------------------------------------------------ + + def get_firmware_id(self) -> str: + return _FIRMWARE_ID + + # ------------------------------------------------------------------ + # Raw / Console + # ------------------------------------------------------------------ + + def send_raw(self, cmd: str) -> str: + """Simulate firmware console with basic submenu tracking.""" + cmd_stripped = cmd.strip().lower() + + # Submenu navigation. + if cmd_stripped == "q": + self._menu = _DemoMenu.ROOT + return "TRK>" + + _enter_map: dict[str, _DemoMenu] = { + "mot": _DemoMenu.MOT, + "dvb": _DemoMenu.DVB, + "nvs": _DemoMenu.NVS, + "a3981": _DemoMenu.A3981, + "adc": _DemoMenu.ADC, + "os": _DemoMenu.OS, + "step": _DemoMenu.STEP, + "peak": _DemoMenu.PEAK, + "eeprom": _DemoMenu.EEPROM, + "gpio": _DemoMenu.GPIO, + "latlon": _DemoMenu.LATLON, + "dipswitch": _DemoMenu.DIPSWITCH, + } + + if cmd_stripped in _enter_map and self._menu == _DemoMenu.ROOT: + self._menu = _enter_map[cmd_stripped] + prompt = cmd_stripped.upper() + ">" + return prompt + + # Context-dependent responses. + if self._menu == _DemoMenu.MOT: + return self._handle_mot(cmd_stripped) + if self._menu == _DemoMenu.DVB: + return self._handle_dvb(cmd_stripped) + if self._menu == _DemoMenu.NVS: + return self._handle_nvs(cmd_stripped) + if self._menu == _DemoMenu.A3981: + return self._handle_a3981(cmd_stripped) + if self._menu == _DemoMenu.ADC: + return self._handle_adc(cmd_stripped) + if self._menu == _DemoMenu.OS: + return self._handle_os(cmd_stripped) + if self._menu == _DemoMenu.ROOT: + return self._handle_root(cmd_stripped) + + return f"Unknown command: {cmd}\nTRK>" + + def _handle_root(self, cmd: str) -> str: + if cmd in ("?", "help"): + return ( + "Available commands:\n" + " a3981 adc dipswitch dvb eeprom gpio\n" + " latlon mot nvs os peak step\n" + " q reboot stow\n" + "TRK>" + ) + if cmd == "reboot": + return "Rebooting...\nApplication Starting Kinetis PCB...\nTRK>" + return f"Unknown command: {cmd}\nTRK>" + + def _handle_mot(self, cmd: str) -> str: + if cmd in ("?", "help"): + return ( + "Available commands:\n" + " a azscan azscanwxp e ela2s elminmaxhome\n" + " els2a g h l life ma motorboth motorlife\n" + " mv p pid r sd sp sw v vms w\n" + "MOT>" + ) + if cmd == "a": + self._update_position() + return f" Angle[0] = {self._az:.2f}\n Angle[1] = {self._el:.2f}\nMOT>" + if cmd == "l": + return "Motors:\n 0 - AZIMUTH: local\n 1 - ELEVATION: local\nMOT>" + if cmd == "e": + self._engaged = True + return "Motors engaged\nMOT>" + if cmd == "r": + self._engaged = False + return "Motors released\nMOT>" + if cmd == "elminmaxhome": + return "Min: 1800 Max: 6500 Home: 6500\nMOT>" + if cmd == "life": + return _MOTOR_LIFE + "\nMOT>" + if cmd.startswith("a "): + parts = cmd.split() + if len(parts) >= 3: + motor_id = int(parts[1]) + degrees = float(parts[2]) + if motor_id == 0: + self._target_az = degrees + elif motor_id == 1: + self._target_el = degrees + self._last_move_time = time.monotonic() + return f" Angle = {degrees:.2f}\nMOT>" + return "Invalid parameters\nMOT>" + if cmd.startswith("h "): + parts = cmd.split() + if len(parts) >= 2: + motor_id = int(parts[1]) + self.home_motor(motor_id) + return f"Homing motor {motor_id}\nMOT>" + return "Invalid parameters\nMOT>" + if cmd == "mv": + return "Max Vel [0] = 65.0 Max Vel [1] = 45.0\nMOT>" + if cmd == "ma": + return "Accel[0] = 400.0 Accel[1] = 400.0\nMOT>" + if cmd == "p": + self._update_position() + az_steps = int(self._az * 40000 / 360) + el_steps = int(self._el * 24960 / 360) + return f"Position[0] = {az_steps} Position[1] = {el_steps}\nMOT>" + return f"Unknown command: {cmd}\nMOT>" + + def _handle_dvb(self, cmd: str) -> str: + if cmd in ("?", "help"): + return ( + "Available commands:\n" + " agc config def diag dis e freqs\n" + " lnbdc lnbv ls man msw nid pwr\n" + " qls range rssi shuf snr srch srch_mode\n" + " stats t table tablex tabto to\n" + "DVB>" + ) + if cmd.startswith("rssi"): + rssi_val = int(self._compute_rssi()) + parts = cmd.split() + iters = int(parts[1]) if len(parts) > 1 else 10 + noise = random.gauss(0.0, 30.0) + cur = int(rssi_val + noise) + return ( + f"iterations:{iters} interval(msec):20\n" + f" Reads:{iters} RSSI[avg: {rssi_val} cur: {cur}]\n" + "DVB>" + ) + if cmd == "config": + return _DVB_CONFIG + "\nDVB>" + if cmd == "dis": + return _CHANNEL_PARAMS + "\nDVB>" + if cmd == "lnbdc odu": + return "Enabled LNB ODU 13V\nDVB>" + if cmd == "qls": + rssi_val = int(self._compute_rssi()) + locked = 1 if rssi_val > 1500 else 0 + return f"Lock:{locked} rssi:{rssi_val} cnt:0\nDVB>" + return f"Unknown command: {cmd}\nDVB>" + + def _handle_nvs(self, cmd: str) -> str: + if cmd in ("?", "help"): + return "Available commands:\n d e s\nNVS>" + if cmd == "d": + return _NVS_DUMP_TEXT + "\nNVS>" + if cmd == "s": + return "NVS saved\nNVS>" + if cmd.startswith("e "): + parts = cmd.split() + if len(parts) >= 2: + try: + idx = int(parts[1]) + line = _NVS_LINES.get(idx) + if line: + return line + "\nNVS>" + return f"NVS index {idx} not found\nNVS>" + except ValueError: + pass + return "Invalid parameters\nNVS>" + return f"Unknown command: {cmd}\nNVS>" + + def _handle_a3981(self, cmd: str) -> str: + if cmd in ("?", "help"): + return "Available commands:\n cm diag reset sm ss st\nA3981>" + if cmd == "diag": + return "AZ DIAG: OK\nEL DIAG: OK\nA3981>" + if cmd == "sm": + return "AZ Step Size Mode = AUTO\nEL Step Size Mode = AUTO\nA3981>" + if cmd == "cm": + return "AZ: Mode = AUTO\nEL: Mode = AUTO\nA3981>" + if cmd == "ss": + return ( + "KEY: FULL-16, HALF-8, QTR-4, EIGHTH-2, SIXTEENTH-1\n" + "AZ Step Size:1\n" + "EL Step Size:1\n" + "A3981>" + ) + if cmd == "st": + if self._is_moving: + return "AZ Torq:HIGH\nEL Torq:HIGH\nA3981>" + return "AZ Torq:LOW\nEL Torq:LOW\nA3981>" + if cmd == "reset": + return "Az/El A3981 Faults Reset.\nA3981>" + return f"Unknown command: {cmd}\nA3981>" + + def _handle_adc(self, cmd: str) -> str: + if cmd in ("?", "help"): + return "Available commands:\n bdid bdrevid m rssi scan\nADC>" + if cmd == "rssi": + return str(int(self._compute_rssi())) + "\nADC>" + if cmd == "bdid": + return "STATIONARY\nADC>" + if cmd == "bdrevid": + return "A\nADC>" + return f"Unknown command: {cmd}\nADC>" + + def _handle_os(self, cmd: str) -> str: + if cmd in ("?", "help"): + return "Available commands:\n id reboot\nOS>" + if cmd == "id": + return _FIRMWARE_ID + "\nOS>" + if cmd == "reboot": + return "Rebooting...\nApplication Starting Kinetis PCB...\nTRK>" + return f"Unknown command: {cmd}\nOS>" diff --git a/tui/src/birdcage_tui/screens/__init__.py b/tui/src/birdcage_tui/screens/__init__.py new file mode 100644 index 0000000..5dd3480 --- /dev/null +++ b/tui/src/birdcage_tui/screens/__init__.py @@ -0,0 +1 @@ +"""TUI screen modules — one per F-key mode.""" diff --git a/tui/src/birdcage_tui/screens/console.py b/tui/src/birdcage_tui/screens/console.py new file mode 100644 index 0000000..2c1e22d --- /dev/null +++ b/tui/src/birdcage_tui/screens/console.py @@ -0,0 +1,232 @@ +"""F5 Console Screen -- raw serial terminal with color-coded prompts +and command history.""" + +import re + +from textual import work +from textual.app import ComposeResult +from textual.containers import Container, Horizontal +from textual.events import Key +from textual.widgets import Button, Input, Static + +from birdcage_tui.widgets.serial_log import SerialLog + +_KNOWN_PROMPTS = [ + "TRK>", + "MOT>", + "DVB>", + "NVS>", + "A3981>", + "ADC>", + "OS>", + "STEP>", + "PEAK>", + "EE>", + "GPIO>", + "LATLON>", + "DIPSWITCH>", +] + +# Pattern to detect NVS write commands: "nvs" ... "e " +# or just "e " when already in the NVS submenu. +_NVS_WRITE_RE = re.compile(r"e\s+\d+\s+\S+") + + +def _detect_prompt(text: str) -> str | None: + """Find the last known prompt in the response text.""" + last_prompt = None + last_pos = -1 + for prompt in _KNOWN_PROMPTS: + pos = text.rfind(prompt) + if pos > last_pos: + last_pos = pos + last_prompt = prompt + return last_prompt + + +class ConsoleScreen(Container): + """F5: Raw serial console for direct firmware interaction.""" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._device: object | None = None + self._command_history: list[str] = [] + self._history_idx: int = 0 + self._cmd_count: int = 0 + self._prompt_ctx: str = "TRK>" + self._last_dangerous_cmd: str | None = None + + def compose(self) -> ComposeResult: + with Container(classes="screen-container"): + yield SerialLog(id="serial-log") + with Horizontal(classes="console-context"): + yield Static("Context: TRK>", id="console-context") + yield Static(" Commands: 0", id="console-cmd-count") + with Horizontal(classes="console-input-area"): + yield Static("> ", classes="label") + yield Input(placeholder="Enter command...", id="console-input") + yield Button("Send", id="btn-send", variant="primary") + + def set_device(self, device: object) -> None: + """Store the device reference and show a welcome message.""" + self._device = device + + serial_log = self.query_one("#serial-log", SerialLog) + + # Determine connection description. + if hasattr(device, "demo_mode") or type(device).__name__ == "DemoDevice": + mode_label = "DEMO" + else: + mode_label = getattr(device, "firmware_name", "Live") + + port = getattr(self.app, "serial_port", "/dev/ttyUSB0") if self.app else "---" + + serial_log.append_output("Birdcage Console -- type ? for help") + serial_log.append_output(f"Connected to: {mode_label} / {port}") + + def _check_dangerous(self, cmd: str) -> str | None: + """Return a warning message if the command is dangerous, or None if safe. + + If the same dangerous command is sent twice in a row, allow it through + (the user is insisting). + """ + stripped = cmd.strip() + lower = stripped.lower() + + # Same dangerous command repeated -- user is insisting. + is_repeat = ( + self._last_dangerous_cmd is not None + and stripped == self._last_dangerous_cmd + ) + if is_repeat: + self._last_dangerous_cmd = None + return None + + warning = None + + if lower == "q" and self._prompt_ctx == "TRK>": + warning = ( + "Warning: 'q' at root kills the shell! " + "Use submenu-level 'q' to exit submenus." + ) + elif lower == "reboot": + warning = "Warning: 'reboot' will restart the dish firmware." + elif _NVS_WRITE_RE.search(lower): + warning = "Warning: NVS write detected. Are you sure?" + + if warning is not None: + self._last_dangerous_cmd = stripped + else: + self._last_dangerous_cmd = None + + return warning + + def _do_send(self, cmd_text: str) -> None: + """Validate and dispatch a command. Called on Enter or Send button.""" + cmd_text = cmd_text.strip() + if not cmd_text: + return + + # Safety gate. + warning = self._check_dangerous(cmd_text) + if warning is not None: + self.notify(warning, severity="warning", timeout=5) + return + + # Record in history. + self._command_history.append(cmd_text) + self._history_idx = len(self._command_history) + + # Show the command in the log immediately. + serial_log = self.query_one("#serial-log", SerialLog) + serial_log.append_command(cmd_text) + + # Clear input right away so the user can type while waiting. + self.query_one("#console-input", Input).value = "" + + # Dispatch to worker thread (serial I/O blocks). + self._send_command(cmd_text) + + @work(thread=True) + def _send_command(self, cmd: str) -> None: + """Send the command over serial and update the UI with the response.""" + if self._device is None: + self.app.call_from_thread( + self.notify, "No device connected", severity="error" + ) + return + + try: + response = self._device.send_raw(cmd) + except Exception as exc: + self.app.call_from_thread(self._on_response, f"ERROR: {exc}", cmd) + return + + self.app.call_from_thread(self._on_response, response, cmd) + + def _on_response(self, response: str, cmd: str) -> None: + """Process the firmware response on the main thread.""" + serial_log = self.query_one("#serial-log", SerialLog) + serial_log.append_output(response) + + # Detect prompt context from the response. + detected = _detect_prompt(response) + if detected is not None: + self._prompt_ctx = detected + ctx_label = self.query_one("#console-context", Static) + ctx_label.update(f"Context: {self._prompt_ctx}") + + # Update command count. + self._cmd_count += 1 + count_label = self.query_one("#console-cmd-count", Static) + count_label.update(f" Commands: {self._cmd_count}") + + # ------------------------------------------------------------------ + # Event handlers + # ------------------------------------------------------------------ + + def on_input_submitted(self, event: Input.Submitted) -> None: + """Handle Enter key in the command input.""" + if event.input.id == "console-input": + self._do_send(event.value) + + def on_button_pressed(self, event: Button.Pressed) -> None: + """Handle the Send button click.""" + if event.button.id == "btn-send": + cmd_input = self.query_one("#console-input", Input) + self._do_send(cmd_input.value) + + def on_key(self, event: Key) -> None: + """Handle up/down arrow keys for command history navigation.""" + cmd_input = self.query_one("#console-input", Input) + + # Only respond when the input widget has focus. + if not cmd_input.has_focus: + return + + if event.key == "up": + event.prevent_default() + event.stop() + if not self._command_history: + return + self._history_idx = max(0, self._history_idx - 1) + cmd_input.value = self._command_history[self._history_idx] + cmd_input.cursor_position = len(cmd_input.value) + + elif event.key == "down": + event.prevent_default() + event.stop() + if not self._command_history: + return + self._history_idx = min(len(self._command_history), self._history_idx + 1) + if self._history_idx >= len(self._command_history): + cmd_input.value = "" + else: + cmd_input.value = self._command_history[self._history_idx] + cmd_input.cursor_position = len(cmd_input.value) + + elif event.key == "ctrl+l": + event.prevent_default() + event.stop() + serial_log = self.query_one("#serial-log", SerialLog) + serial_log.clear() diff --git a/tui/src/birdcage_tui/screens/position.py b/tui/src/birdcage_tui/screens/position.py new file mode 100644 index 0000000..da348c2 --- /dev/null +++ b/tui/src/birdcage_tui/screens/position.py @@ -0,0 +1,282 @@ +"""F1 Position screen -- AZ/EL display, manual moves, homing, engage/release. + +Widget container for ContentSwitcher. Polls the device at 2 Hz for +position and step data, drives the compass rose, motor status panel, +and AZ/EL sparklines. Bottom row provides manual move controls. +""" + +import logging +import time + +from textual import work +from textual.app import ComposeResult +from textual.binding import Binding +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import Button, Input, Static +from textual.worker import Worker + +from birdcage_tui.widgets.compass_rose import CompassRose +from birdcage_tui.widgets.motor_status import MotorStatus +from birdcage_tui.widgets.sparkline_widget import SparklineWidget + +log = logging.getLogger(__name__) + + +class PositionScreen(Container): + """F1: Position control and monitoring.""" + + can_focus = True + + BINDINGS = [ + Binding("left", "nudge_az(-1)", "AZ -1", show=False), + Binding("right", "nudge_az(1)", "AZ +1", show=False), + Binding("up", "nudge_el(1)", "EL +1", show=False), + Binding("down", "nudge_el(-1)", "EL -1", show=False), + Binding("h", "home_both", "Home Both", show=False), + Binding("e", "engage_motors", "Engage", show=False), + Binding("r", "release_motors", "Release", show=False), + ] + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._device: object = None + self._polling = False + self._engaged = False + self._poll_worker: Worker | None = None + # Track last-known position for nudge commands. + self._last_az = 180.0 + self._last_el = 45.0 + + def compose(self) -> ComposeResult: + with Container(classes="screen-container"): + with Horizontal(classes="top-row"): + yield CompassRose(id="compass") + with Vertical(classes="panel"): + yield Static("Motor Status", classes="panel-title") + yield MotorStatus(id="motor-status") + with Vertical(): + yield SparklineWidget( + max_points=80, label="AZ", color="#00d4aa", id="az-spark" + ) + yield SparklineWidget( + max_points=80, label="EL", color="#00b8c8", id="el-spark" + ) + with Horizontal(classes="bottom-controls"): + yield Static("AZ ", classes="label") + yield Input(placeholder="180.0", id="az-input", type="number") + yield Static(" EL ", classes="label") + yield Input(placeholder="45.0", id="el-input", type="number") + yield Button("Move", id="btn-move", variant="primary") + yield Button("Home AZ", id="btn-home-az") + yield Button("Home EL", id="btn-home-el") + yield Button("E/R", id="btn-engage") + + # ------------------------------------------------------------------ + # Device lifecycle + # ------------------------------------------------------------------ + + def set_device(self, device: object) -> None: + """Store the device reference and start position polling.""" + self._device = device + self._polling = True + self._poll_worker = self._do_position_poll() + + def on_show(self) -> None: + """Resume polling when this screen becomes visible.""" + if self._device is not None and not self._polling: + self._polling = True + self._poll_worker = self._do_position_poll() + + # ------------------------------------------------------------------ + # Position poll worker + # ------------------------------------------------------------------ + + @work(thread=True, exclusive=True, group="position-poll") + def _do_position_poll(self) -> None: + """Poll device at ~2 Hz for position and step data.""" + while self._polling: + if self._device is None: + time.sleep(0.5) + continue + + try: + pos = self._device.get_position() + az = pos["azimuth"] + el = pos["elevation"] + self._last_az = az + self._last_el = el + + self.app.call_from_thread(self._update_compass, az, el) + self.app.call_from_thread(self._push_sparklines, az, el) + except Exception: + log.debug("Position poll failed", exc_info=True) + + try: + steps = self._device.get_step_positions() + self.app.call_from_thread( + self._update_motor_steps, + steps["az_steps"], + steps["el_steps"], + ) + except Exception: + log.debug("Step position poll failed", exc_info=True) + + # Poll torque state from A3981 + try: + torque_resp = self._device.get_a3981_torque() + lines = torque_resp.split("\n") + az_torque = "HIGH" if "HIGH" in lines[0] else "LOW" + el_torque = "HIGH" if len(lines) > 1 and "HIGH" in lines[1] else "LOW" + self.app.call_from_thread(self._update_torque, az_torque, el_torque) + except Exception: + log.debug("Torque poll failed", exc_info=True) + + time.sleep(0.5) + + # ------------------------------------------------------------------ + # Thread-safe widget update callbacks + # ------------------------------------------------------------------ + + def _update_compass(self, az: float, el: float) -> None: + compass = self.query_one("#compass", CompassRose) + compass.azimuth = az + compass.elevation = el + + def _push_sparklines(self, az: float, el: float) -> None: + self.query_one("#az-spark", SparklineWidget).push(az) + self.query_one("#el-spark", SparklineWidget).push(el) + + def _update_motor_steps(self, az_steps: int, el_steps: int) -> None: + motor = self.query_one("#motor-status", MotorStatus) + motor.az_steps = az_steps + motor.el_steps = el_steps + + def _update_torque(self, az_torque: str, el_torque: str) -> None: + motor = self.query_one("#motor-status", MotorStatus) + motor.az_torque = az_torque + motor.el_torque = el_torque + + def _update_engaged(self, engaged: bool) -> None: + motor = self.query_one("#motor-status", MotorStatus) + motor.engaged = engaged + + # ------------------------------------------------------------------ + # Button handlers + # ------------------------------------------------------------------ + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + + if button_id == "btn-move": + self._handle_move() + elif button_id == "btn-home-az": + self._handle_home(0) + elif button_id == "btn-home-el": + self._handle_home(1) + elif button_id == "btn-engage": + self._handle_engage_toggle() + + def _handle_move(self) -> None: + """Read AZ/EL inputs and issue a move command.""" + if self._device is None: + return + + az_input = self.query_one("#az-input", Input) + el_input = self.query_one("#el-input", Input) + + try: + az = float(az_input.value) if az_input.value.strip() else self._last_az + except ValueError: + self.app.notify("Invalid AZ value", severity="warning") + return + + try: + el = float(el_input.value) if el_input.value.strip() else self._last_el + except ValueError: + self.app.notify("Invalid EL value", severity="warning") + return + + self._run_motor_command(self._device.move_to, az, el) + + def _handle_home(self, motor_id: int) -> None: + """Home a specific motor axis.""" + if self._device is None: + return + axis = "AZ" if motor_id == 0 else "EL" + self.app.notify(f"Homing {axis}...", severity="information") + self._run_motor_command(self._device.home_motor, motor_id) + + def _handle_engage_toggle(self) -> None: + """Toggle motor engage/release state.""" + if self._device is None: + return + + if self._engaged: + self._run_motor_command(self._device.release) + self._engaged = False + self._update_engaged(False) + self.app.notify("Motors released") + else: + self._run_motor_command(self._device.engage) + self._engaged = True + self._update_engaged(True) + self.app.notify("Motors engaged") + + # ------------------------------------------------------------------ + # Key binding actions + # ------------------------------------------------------------------ + + def action_nudge_az(self, delta: int) -> None: + """Nudge azimuth by delta degrees.""" + if self._device is None: + return + new_az = self._last_az + delta + self._run_motor_command(self._device.move_motor, 0, new_az) + + def action_nudge_el(self, delta: int) -> None: + """Nudge elevation by delta degrees.""" + if self._device is None: + return + new_el = self._last_el + delta + self._run_motor_command(self._device.move_motor, 1, new_el) + + def action_home_both(self) -> None: + """Home both AZ and EL motors.""" + if self._device is None: + return + self.app.notify("Homing AZ + EL...", severity="information") + self._run_motor_command(self._device.home_motor, 0) + self._run_motor_command(self._device.home_motor, 1) + + def action_engage_motors(self) -> None: + """Engage (energize) stepper motors.""" + if self._device is None: + return + self._run_motor_command(self._device.engage) + self._engaged = True + self._update_engaged(True) + self.app.notify("Motors engaged") + + def action_release_motors(self) -> None: + """Release (de-energize) stepper motors.""" + if self._device is None: + return + self._run_motor_command(self._device.release) + self._engaged = False + self._update_engaged(False) + self.app.notify("Motors released") + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @work(thread=True, exclusive=False, group="motor-cmd") + def _run_motor_command(self, fn, *args) -> None: + """Execute a motor command in a worker thread.""" + try: + fn(*args) + except Exception: + log.exception("Motor command failed") + self.app.call_from_thread( + self.app.notify, "Motor command failed", severity="error" + ) diff --git a/tui/src/birdcage_tui/screens/scan.py b/tui/src/birdcage_tui/screens/scan.py new file mode 100644 index 0000000..a194196 --- /dev/null +++ b/tui/src/birdcage_tui/screens/scan.py @@ -0,0 +1,272 @@ +"""F3 Scan Screen -- AZ sweep heatmap, sky mapping with configurable parameters. + +Grid-based sky scan: iterates over AZ/EL range, moves the dish to each point, +reads RSSI, and paints the result into a 2D heatmap. Supports CSV export of +raw (az, el, rssi) data for offline analysis. +""" + +import csv +import logging +import time +from pathlib import Path + +from textual import work +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import Button, Input, ProgressBar, Static +from textual.worker import get_current_worker + +from birdcage_tui.widgets.sky_heatmap import SkyHeatmap +from birdcage_tui.widgets.sparkline_widget import SparklineWidget + +log = logging.getLogger(__name__) + +# Type alias -- SerialBridge and DemoDevice share the same duck-typed interface. +DeviceLike = object + + +class ScanScreen(Container): + """F3: Sky scan and RF mapping.""" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._device: DeviceLike | None = None + self._scanning = False + self._scan_data: list[tuple[float, float, float]] = [] + + def compose(self): + with Container(classes="screen-container"): + with Vertical(classes="panel"): + yield Static("Sky Scan", classes="panel-title") + yield SkyHeatmap(az_bins=40, el_bins=10, id="heatmap") + yield SparklineWidget( + max_points=80, label="Sweep RSSI", color="#00d4aa", id="sweep-spark" + ) + with Horizontal(classes="scan-status"): + yield Static("Idle", id="scan-status-text") + yield ProgressBar(id="scan-progress", total=100, show_eta=False) + with Horizontal(classes="bottom-controls"): + yield Static("AZ ", classes="label") + yield Input(value="160", id="az-start", type="number") + yield Static("-", classes="label") + yield Input(value="220", id="az-end", type="number") + yield Static(" Step ", classes="label") + yield Input(value="1.5", id="az-step", type="number") + yield Static(" EL ", classes="label") + yield Input(value="18", id="el-start", type="number") + yield Static("-", classes="label") + yield Input(value="65", id="el-end", type="number") + yield Static(" Step ", classes="label") + yield Input(value="5.0", id="el-step", type="number") + with Horizontal(classes="bottom-controls"): + yield Static("Transponders ", classes="label") + yield Input(value="3", id="xponder-input", type="integer") + yield Button("Start Scan", id="btn-start-scan", variant="primary") + yield Button("Stop", id="btn-stop-scan") + yield Button("Export CSV", id="btn-export") + + # ------------------------------------------------------------------ + # Device wiring + # ------------------------------------------------------------------ + + def set_device(self, device: DeviceLike) -> None: + """Store the device reference (SerialBridge or DemoDevice).""" + self._device = device + + # ------------------------------------------------------------------ + # Input helpers + # ------------------------------------------------------------------ + + def _read_float(self, widget_id: str, fallback: float) -> float: + """Read a float from an Input widget, returning *fallback* on parse error.""" + try: + return float(self.query_one(f"#{widget_id}", Input).value) + except (ValueError, TypeError): + return fallback + + def _read_int(self, widget_id: str, fallback: int) -> int: + """Read an int from an Input widget, returning *fallback* on parse error.""" + try: + return int(self.query_one(f"#{widget_id}", Input).value) + except (ValueError, TypeError): + return fallback + + # ------------------------------------------------------------------ + # Scan worker + # ------------------------------------------------------------------ + + @work(thread=True) + def _do_scan(self) -> None: + """Execute the AZ/EL grid scan in a background thread.""" + worker = get_current_worker() + device = self._device + if device is None: + return + + # Read scan parameters (widget access must happen via call_from_thread + # for Input.value, but Textual Input.value is a reactive that is safe + # to read from threads as a string snapshot). + az_start = self._read_float("az-start", 160.0) + az_end = self._read_float("az-end", 220.0) + az_step = self._read_float("az-step", 1.5) + el_start = self._read_float("el-start", 18.0) + el_end = self._read_float("el-end", 65.0) + el_step = self._read_float("el-step", 5.0) + iterations = self._read_int("xponder-input", 3) + + # Clamp step sizes to something sane. + if az_step <= 0: + az_step = 1.0 + if el_step <= 0: + el_step = 1.0 + + # Build the grid point list. + el_values: list[float] = [] + el = el_start + while el <= el_end + 1e-9: + el_values.append(round(el, 2)) + el += el_step + + az_values: list[float] = [] + az = az_start + while az <= az_end + 1e-9: + az_values.append(round(az, 2)) + az += az_step + + total_points = len(el_values) * len(az_values) + if total_points == 0: + self.app.call_from_thread( + self._set_status, "No grid points -- check parameters" + ) + return + + heatmap = self.query_one("#heatmap", SkyHeatmap) + spark = self.query_one("#sweep-spark", SparklineWidget) + + done = 0 + + for _el_idx, el_val in enumerate(el_values): + for _az_idx, az_val in enumerate(az_values): + if not self._scanning or worker.is_cancelled: + self.app.call_from_thread(self._set_status, "Scan stopped") + return + + # Move dish. + try: + device.move_to(az_val, el_val) + except Exception: + log.exception("move_to failed at AZ=%.2f EL=%.2f", az_val, el_val) + msg = f"Move error at AZ={az_val:.1f} EL={el_val:.1f}" + self.app.call_from_thread(self._set_status, msg) + continue + + # Settle time -- let the motor stop and vibrations damp. + time.sleep(0.3) + + # Read signal. + try: + rssi_data = device.get_rssi(iterations) + rssi = float(rssi_data.get("average", 0)) + except Exception: + log.exception("get_rssi failed at AZ=%.2f EL=%.2f", az_val, el_val) + rssi = 0.0 + + # Record raw data. + self._scan_data.append((az_val, el_val, rssi)) + + # Map to heatmap grid indices -- fit into fixed-size bins. + az_span = az_end - az_start + 1e-9 + el_span = el_end - el_start + 1e-9 + grid_az = min( + int((az_val - az_start) / az_span * heatmap.az_bins), + heatmap.az_bins - 1, + ) + grid_el = min( + int((el_val - el_start) / el_span * heatmap.el_bins), + heatmap.el_bins - 1, + ) + + # Update widgets. + self.app.call_from_thread(heatmap.set_point, grid_az, grid_el, rssi) + self.app.call_from_thread(heatmap.set_active, grid_az, grid_el) + self.app.call_from_thread(spark.push, rssi) + + done += 1 + pct = int(done * 100 / total_points) + status_text = ( + f"Scanning AZ={az_val:.1f} EL={el_val:.1f} " + f"RSSI={rssi:.0f} [{done}/{total_points}]" + ) + self.app.call_from_thread(self._set_progress, pct, status_text) + + msg = f"Scan complete -- {total_points} points" + self.app.call_from_thread(self._set_status, msg) + + # ------------------------------------------------------------------ + # Widget update helpers (called via call_from_thread) + # ------------------------------------------------------------------ + + def _set_status(self, text: str) -> None: + """Update the scan status text label.""" + self.query_one("#scan-status-text", Static).update(text) + + def _set_progress(self, pct: int, status_text: str) -> None: + """Update both progress bar and status text.""" + self.query_one("#scan-progress", ProgressBar).update(progress=pct) + self.query_one("#scan-status-text", Static).update(status_text) + + # ------------------------------------------------------------------ + # Button handlers + # ------------------------------------------------------------------ + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + + if button_id == "btn-start-scan": + self._start_scan() + elif button_id == "btn-stop-scan": + self._stop_scan() + elif button_id == "btn-export": + self._export_csv() + + def _start_scan(self) -> None: + """Clear state and kick off the scan worker.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + + if self._scanning: + self.app.notify("Scan already in progress", severity="warning") + return + + # Reset. + heatmap = self.query_one("#heatmap", SkyHeatmap) + heatmap.clear() + self._scan_data.clear() + self.query_one("#scan-progress", ProgressBar).update(progress=0) + self._set_status("Starting scan...") + + self._scanning = True + self._do_scan() + + def _stop_scan(self) -> None: + """Signal the scan worker to stop.""" + self._scanning = False + self._set_status("Stopping...") + + def _export_csv(self) -> None: + """Write scan data to /tmp/birdcage_scan.csv.""" + if not self._scan_data: + self.app.notify("No scan data to export", severity="warning") + return + + output = Path("/tmp/birdcage_scan.csv") + try: + with output.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow(["az", "el", "rssi"]) + for az, el, rssi in self._scan_data: + writer.writerow([f"{az:.2f}", f"{el:.2f}", f"{rssi:.1f}"]) + self.app.notify(f"Exported {len(self._scan_data)} points to {output}") + except OSError as exc: + log.exception("CSV export failed") + self.app.notify(f"Export failed: {exc}", severity="error") diff --git a/tui/src/birdcage_tui/screens/signal.py b/tui/src/birdcage_tui/screens/signal.py new file mode 100644 index 0000000..0903c0a --- /dev/null +++ b/tui/src/birdcage_tui/screens/signal.py @@ -0,0 +1,233 @@ +"""F2 Signal screen -- RSSI monitoring, sparklines, LNB control. + +Widget container for ContentSwitcher. Provides start/stop signal +monitoring with configurable iteration count and poll rate, dual +sparklines for DVB and ADC RSSI, peak tracking, and LNA toggle. +""" + +import logging +import re +import time + +from textual import work +from textual.app import ComposeResult +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import Button, Input, Static +from textual.worker import Worker + +from birdcage_tui.widgets.signal_gauge import SignalGauge +from birdcage_tui.widgets.sparkline_widget import SparklineWidget + +log = logging.getLogger(__name__) + + +class SignalScreen(Container): + """F2: Signal monitoring and RSSI display.""" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._device: object = None + self._monitoring = False + self._lna_enabled = False + self._peak_rssi = 0 + self._total_samples = 0 + self._signal_worker: Worker | None = None + + def compose(self) -> ComposeResult: + with Container(classes="screen-container"): + with Vertical(classes="panel"): + yield Static("Signal Strength", classes="panel-title") + yield SignalGauge(id="signal-gauge") + with Vertical(): + yield SparklineWidget( + max_points=80, label="DVB RSSI", color="#00d4aa", id="dvb-spark" + ) + yield SparklineWidget( + max_points=80, label="ADC RSSI", color="#2080d0", id="adc-spark" + ) + with Horizontal(classes="panel"): + yield Static("Samples: 0", id="sample-count", classes="label") + yield Static(" Peak: 0", id="peak-value", classes="label") + yield Static(" LNA: OFF", id="lna-status", classes="label") + yield Static(" Lock: NO", id="lock-status", classes="label") + with Horizontal(classes="bottom-controls"): + yield Static("Iters ", classes="label") + yield Input(value="10", id="iter-input", type="integer") + yield Static(" Rate ", classes="label") + yield Input(value="2", id="rate-input", type="integer") + yield Button("Start", id="btn-start", variant="primary") + yield Button("Stop", id="btn-stop") + yield Button("Enable LNA", id="btn-lna") + + # ------------------------------------------------------------------ + # Device lifecycle + # ------------------------------------------------------------------ + + def set_device(self, device: object) -> None: + """Store the device reference.""" + self._device = device + + def on_show(self) -> None: + """Called when this screen becomes visible.""" + pass # Monitoring is explicit via Start/Stop buttons. + + # ------------------------------------------------------------------ + # Signal poll worker + # ------------------------------------------------------------------ + + @work(thread=True, exclusive=True, group="signal-poll") + def _do_signal_poll(self) -> None: + """Poll RSSI at the configured rate while monitoring is active.""" + while self._monitoring: + if self._device is None: + time.sleep(0.5) + continue + + # Read config from inputs (safe defaults on parse failure). + try: + iterations = int( + self.app.call_from_thread(self._read_input, "iter-input") or "10" + ) + iterations = max(1, iterations) + except (ValueError, TypeError): + iterations = 10 + + try: + rate = int( + self.app.call_from_thread(self._read_input, "rate-input") or "2" + ) + rate = max(1, rate) + except (ValueError, TypeError): + rate = 2 + + # DVB RSSI (bounded, averaged) + try: + rssi = self._device.get_rssi(iterations) + rssi_avg = rssi["average"] + rssi_cur = rssi["current"] + reads = rssi["reads"] + + self._total_samples += reads + if rssi_cur > self._peak_rssi: + self._peak_rssi = rssi_cur + + self.app.call_from_thread(self._update_gauge, rssi_avg, rssi_cur, reads) + self.app.call_from_thread(self._push_dvb_spark, float(rssi_avg)) + self.app.call_from_thread(self._update_stats) + except Exception: + log.debug("DVB RSSI poll failed", exc_info=True) + + # ADC RSSI (raw single-shot) + try: + adc_resp = self._device.get_adc_rssi() + adc_match = re.search(r"(\d+)", adc_resp) + if adc_match: + adc_val = float(adc_match.group(1)) + self.app.call_from_thread(self._push_adc_spark, adc_val) + except Exception: + log.debug("ADC RSSI poll failed", exc_info=True) + + # Lock status + try: + lock_resp = self._device.get_lock_status() + lock_match = re.search(r"Lock:(\d)", lock_resp) + if lock_match: + locked = lock_match.group(1) == "1" + self.app.call_from_thread(self._update_lock, locked) + except Exception: + log.debug("Lock status poll failed", exc_info=True) + + time.sleep(1.0 / rate) + + # ------------------------------------------------------------------ + # Thread-safe widget update callbacks + # ------------------------------------------------------------------ + + def _read_input(self, input_id: str) -> str: + """Read an Input widget's value (must run on main thread).""" + return self.query_one(f"#{input_id}", Input).value + + def _update_gauge(self, rssi_avg: int, rssi_cur: int, reads: int) -> None: + gauge = self.query_one("#signal-gauge", SignalGauge) + gauge.rssi_avg = rssi_avg + gauge.rssi_cur = rssi_cur + gauge.reads = reads + + def _push_dvb_spark(self, value: float) -> None: + self.query_one("#dvb-spark", SparklineWidget).push(value) + + def _push_adc_spark(self, value: float) -> None: + self.query_one("#adc-spark", SparklineWidget).push(value) + + def _update_stats(self) -> None: + self.query_one("#sample-count", Static).update( + f"Samples: {self._total_samples}" + ) + self.query_one("#peak-value", Static).update(f" Peak: {self._peak_rssi}") + + def _update_lock(self, locked: bool) -> None: + label = "YES" if locked else "NO" + self.query_one("#lock-status", Static).update(f" Lock: {label}") + + def _update_lna_label(self) -> None: + label = "ON" if self._lna_enabled else "OFF" + self.query_one("#lna-status", Static).update(f" LNA: {label}") + + # ------------------------------------------------------------------ + # Button handlers + # ------------------------------------------------------------------ + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + + if button_id == "btn-start": + self._handle_start() + elif button_id == "btn-stop": + self._handle_stop() + elif button_id == "btn-lna": + self._handle_lna() + + def _handle_start(self) -> None: + """Start signal monitoring.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + + if self._monitoring: + return + + self._monitoring = True + self._signal_worker = self._do_signal_poll() + self.app.notify("Signal monitoring started") + + self.query_one("#btn-start", Button).variant = "default" + self.query_one("#btn-stop", Button).variant = "warning" + + def _handle_stop(self) -> None: + """Stop signal monitoring.""" + self._monitoring = False + self.app.notify("Signal monitoring stopped") + + self.query_one("#btn-start", Button).variant = "primary" + self.query_one("#btn-stop", Button).variant = "default" + + def _handle_lna(self) -> None: + """Toggle LNA enable (sends lnbdc odu to set 13V).""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + self._do_enable_lna() + + @work(thread=True, exclusive=False, group="signal-cmd") + def _do_enable_lna(self) -> None: + """Enable LNA in a worker thread (blocks on serial I/O).""" + try: + self._device.enable_lna() + self._lna_enabled = True + self.app.call_from_thread(self._update_lna_label) + self.app.call_from_thread(self.app.notify, "LNA enabled (13V ODU)") + except Exception: + log.exception("LNA enable failed") + self.app.call_from_thread( + self.app.notify, "LNA enable failed", severity="error" + ) diff --git a/tui/src/birdcage_tui/screens/system.py b/tui/src/birdcage_tui/screens/system.py new file mode 100644 index 0000000..13719a0 --- /dev/null +++ b/tui/src/birdcage_tui/screens/system.py @@ -0,0 +1,358 @@ +"""F4 System Screen -- NVS table, A3981 diagnostics, motor dynamics, firmware info. + +Aggregates hardware identity, stepper driver status, motor tuning parameters, +and the full non-volatile storage table into a single dashboard panel. All data +is fetched from the device in a background worker thread and pushed to widgets +via call_from_thread. +""" + +import json +import logging +import re +from pathlib import Path + +from rich.text import Text +from textual import work +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import Button, Static +from textual.worker import get_current_worker + +from birdcage_tui.widgets.nvs_table import NvsTable + +log = logging.getLogger(__name__) + +# Type alias -- SerialBridge and DemoDevice share the same duck-typed interface. +DeviceLike = object + + +def _parse_firmware_info(raw: str) -> Text: + """Extract version, clock speed, and antenna ID from ``os > id`` output. + + Returns a styled Rich Text suitable for a Static widget. + """ + version = "?" + clock = "?" + ant_id = "?" + + m = re.search(r"Software version:\s*(\S+)", raw) + if m: + version = m.group(1) + + m = re.search(r"CCLK:\s*(\d+)", raw) + if m: + mhz = int(m.group(1)) // 1_000_000 + clock = f"{mhz}MHz" + + m = re.search(r"Ant ID:\s*(.+?)$", raw, re.MULTILINE) + if m: + ant_id = m.group(1).strip() + + result = Text() + result.append("FW: ", style="#506878") + result.append(version, style="bold #00d4aa") + result.append(" | ", style="#1a2a38") + result.append("MCU: ", style="#506878") + result.append(f"K60 {clock}", style="#c8d0d8") + result.append(" | ", style="#1a2a38") + result.append("Ant: ", style="#506878") + result.append(ant_id, style="#c8d0d8") + return result + + +def _format_a3981(diag: str, modes: dict[str, str], torque: str) -> Text: + """Combine A3981 diagnostic, mode, and torque data into styled text.""" + result = Text() + + # Diagnostics -- highlight OK in green, FAULT in red. + for line in diag.splitlines(): + line = line.strip() + if not line: + continue + if "FAULT" in line.upper(): + result.append(line, style="bold #e04040") + elif "OK" in line.upper(): + result.append(line, style="#00e060") + else: + result.append(line, style="#c8d0d8") + result.append("\n") + + # Step mode. + sm = modes.get("step_mode", "") + for line in sm.splitlines(): + line = line.strip() + if line: + result.append(line, style="#506878") + result.append("\n") + + # Current mode. + cm = modes.get("current_mode", "") + for line in cm.splitlines(): + line = line.strip() + if line: + result.append(line, style="#506878") + result.append("\n") + + # Torque -- HIGH in warm color, LOW in dim. + for line in torque.splitlines(): + line = line.strip() + if not line: + continue + if "HIGH" in line.upper(): + result.append(line, style="#e8c020") + else: + result.append(line, style="#384858") + result.append("\n") + + # Trim trailing newline. + text_str = result.plain + if text_str.endswith("\n"): + result.right_crop(1) + + return result + + +def _format_motor_dynamics( + dynamics: dict[str, float], + el_limits: dict[str, float], +) -> Text: + """Format motor velocity, acceleration, and EL limits into styled text.""" + result = Text() + + az_vel = dynamics.get("az_max_vel", 0.0) + el_vel = dynamics.get("el_max_vel", 0.0) + az_acc = dynamics.get("az_accel", 0.0) + el_acc = dynamics.get("el_accel", 0.0) + + result.append("AZ Max Vel: ", style="#506878") + result.append(f"{az_vel:.1f}", style="bold #c8d0d8") + result.append("\u00b0/s", style="#506878") + result.append(" ", style="#0e1420") + result.append("EL Max Vel: ", style="#506878") + result.append(f"{el_vel:.1f}", style="bold #c8d0d8") + result.append("\u00b0/s", style="#506878") + result.append("\n") + + result.append("AZ Accel: ", style="#506878") + result.append(f"{az_acc:.1f}", style="bold #c8d0d8") + result.append("\u00b0/s\u00b2", style="#506878") + result.append(" ", style="#0e1420") + result.append("EL Accel: ", style="#506878") + result.append(f"{el_acc:.1f}", style="bold #c8d0d8") + result.append("\u00b0/s\u00b2", style="#506878") + result.append("\n") + + result.append("Steps/Rev: ", style="#506878") + result.append("40000 / 24960", style="#c8d0d8") + + el_min = el_limits.get("min", 0.0) + el_max = el_limits.get("max", 0.0) + el_home = el_limits.get("home", 0.0) + + result.append("\n") + result.append("EL Range: ", style="#506878") + result.append(f"{el_min:.1f}", style="#c8d0d8") + result.append("\u00b0 - ", style="#506878") + result.append(f"{el_max:.1f}", style="#c8d0d8") + result.append("\u00b0", style="#506878") + result.append(" Home: ", style="#506878") + result.append(f"{el_home:.1f}", style="bold #00d4aa") + result.append("\u00b0", style="#506878") + + return result + + +class SystemScreen(Container): + """F4: System information, NVS, and diagnostics.""" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._device: DeviceLike | None = None + self._refreshed = False + + def compose(self): + with Container(classes="screen-container"): + with Horizontal(classes="panel"): + yield Static("", id="firmware-info") + with Horizontal(classes="top-row"): + with Vertical(classes="panel"): + yield Static("A3981 Diagnostics", classes="panel-title") + yield Static("", id="a3981-diag") + with Vertical(classes="panel"): + yield Static("Motor Dynamics", classes="panel-title") + yield Static("", id="motor-dynamics") + with Vertical(classes="panel"): + yield Static("NVS Table", classes="panel-title") + yield NvsTable(id="nvs-table") + with Horizontal(classes="bottom-controls"): + yield Button("Refresh All", id="btn-refresh-all", variant="primary") + yield Button("Refresh NVS", id="btn-refresh-nvs") + yield Button("Export NVS JSON", id="btn-export-nvs") + + # ------------------------------------------------------------------ + # Device wiring + # ------------------------------------------------------------------ + + def set_device(self, device: DeviceLike) -> None: + """Store the device reference and trigger an initial refresh.""" + self._device = device + # Only auto-refresh if we're already mounted (widget tree exists). + try: + self.query_one("#firmware-info") + self._do_system_refresh() + except Exception: + # Not mounted yet -- on_show will handle it. + pass + + def on_show(self) -> None: + """Called when this screen becomes visible via ContentSwitcher.""" + if self._device is not None and not self._refreshed: + self._do_system_refresh() + + # ------------------------------------------------------------------ + # System refresh worker + # ------------------------------------------------------------------ + + @work(thread=True) + def _do_system_refresh(self) -> None: + """Fetch all system data from the device in a background thread.""" + worker = get_current_worker() + device = self._device + if device is None: + return + + # 1. Firmware identification. + try: + fw_raw = device.get_firmware_id() + fw_text = _parse_firmware_info(fw_raw) + self.app.call_from_thread( + self.query_one("#firmware-info", Static).update, fw_text + ) + except Exception: + log.exception("Failed to read firmware ID") + self.app.call_from_thread( + self.query_one("#firmware-info", Static).update, + Text("FW: error reading firmware ID", style="#e04040"), + ) + + if worker.is_cancelled: + return + + # 2. A3981 diagnostics. + try: + diag = device.get_a3981_diag() + modes = device.get_a3981_modes() + torque = device.get_a3981_torque() + a3981_text = _format_a3981(diag, modes, torque) + self.app.call_from_thread( + self.query_one("#a3981-diag", Static).update, a3981_text + ) + except Exception: + log.exception("Failed to read A3981 data") + self.app.call_from_thread( + self.query_one("#a3981-diag", Static).update, + Text("Error reading A3981 diagnostics", style="#e04040"), + ) + + if worker.is_cancelled: + return + + # 3. Motor dynamics. + try: + dynamics = device.get_motor_dynamics() + el_limits = device.get_el_limits() + motor_text = _format_motor_dynamics(dynamics, el_limits) + self.app.call_from_thread( + self.query_one("#motor-dynamics", Static).update, motor_text + ) + except Exception: + log.exception("Failed to read motor dynamics") + self.app.call_from_thread( + self.query_one("#motor-dynamics", Static).update, + Text("Error reading motor dynamics", style="#e04040"), + ) + + if worker.is_cancelled: + return + + # 4. NVS dump. + try: + nvs_text = device.nvs_dump() + nvs_table = self.query_one("#nvs-table", NvsTable) + self.app.call_from_thread(nvs_table.load_nvs, nvs_text) + except Exception: + log.exception("Failed to dump NVS") + self.app.call_from_thread( + self.app.notify, "NVS dump failed", severity="error" + ) + + self._refreshed = True + + # ------------------------------------------------------------------ + # NVS-only refresh worker + # ------------------------------------------------------------------ + + @work(thread=True) + def _do_nvs_refresh(self) -> None: + """Refresh just the NVS table without touching other panels.""" + device = self._device + if device is None: + return + + try: + nvs_text = device.nvs_dump() + nvs_table = self.query_one("#nvs-table", NvsTable) + self.app.call_from_thread(nvs_table.load_nvs, nvs_text) + self.app.call_from_thread(self.app.notify, "NVS table refreshed") + except Exception: + log.exception("Failed to refresh NVS") + self.app.call_from_thread( + self.app.notify, "NVS refresh failed", severity="error" + ) + + # ------------------------------------------------------------------ + # Button handlers + # ------------------------------------------------------------------ + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + + if button_id == "btn-refresh-all": + self._handle_refresh_all() + elif button_id == "btn-refresh-nvs": + self._handle_refresh_nvs() + elif button_id == "btn-export-nvs": + self._export_nvs_json() + + def _handle_refresh_all(self) -> None: + """Kick off a full system refresh.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + self._do_system_refresh() + + def _handle_refresh_nvs(self) -> None: + """Kick off an NVS-only refresh.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + self._do_nvs_refresh() + + def _export_nvs_json(self) -> None: + """Export parsed NVS rows to /tmp/birdcage_nvs.json.""" + nvs_table = self.query_one("#nvs-table", NvsTable) + rows = nvs_table.parsed_rows + + if not rows: + self.app.notify( + "No NVS data to export -- refresh first", severity="warning" + ) + return + + output = Path("/tmp/birdcage_nvs.json") + try: + with output.open("w") as fh: + json.dump(rows, fh, indent=2) + self.app.notify(f"Exported {len(rows)} NVS entries to {output}") + except OSError as exc: + log.exception("NVS JSON export failed") + self.app.notify(f"Export failed: {exc}", severity="error") diff --git a/tui/src/birdcage_tui/theme.tcss b/tui/src/birdcage_tui/theme.tcss new file mode 100644 index 0000000..f8f351d --- /dev/null +++ b/tui/src/birdcage_tui/theme.tcss @@ -0,0 +1,496 @@ +/* Birdcage TUI — Dark RF Theme + * Teal accent on deep blue-black. No purple. + * Signal gradient: blue > cyan > green > yellow > red + */ + +/* ── Global ────────────────────────────────────────── */ + +Screen { + background: #0a0a12; + color: #c8d0d8; +} + +Header { + background: #0e1420; + color: #00d4aa; + text-style: bold; + dock: top; + height: 1; +} + +Footer { + background: #0e1420; + color: #506878; + dock: bottom; + height: 1; +} + +/* ── Layout Containers ─────────────────────────────── */ + +#main-area { + layout: horizontal; + height: 1fr; +} + +#sidebar { + width: 26; + background: #0e1420; + border-right: solid #1a2a3a; + padding: 1 1; +} + +.sidebar-title { + color: #00d4aa; + text-style: bold; + text-align: center; + width: 100%; +} + +.sidebar-subtitle { + color: #506878; + text-align: center; + width: 100%; + margin: 0 0 1 0; +} + +#content-area { + width: 1fr; +} + +ContentSwitcher { + width: 1fr; + height: 1fr; +} + +/* ── Sidebar Buttons ───────────────────────────────── */ + +.sidebar-btn { + width: 100%; + height: 3; + margin: 0 0 1 0; + background: #121c2a; + color: #7090a8; + text-style: bold; + border: round #1a3050; + text-align: center; +} + +.sidebar-btn:hover { + background: #1a2a40; + color: #00d4aa; + border: round #00d4aa; +} + +.sidebar-btn.active { + background: #0a2a3a; + color: #00d4aa; + border: round #00d4aa; + text-style: bold; +} + +/* ── Panel / Card ──────────────────────────────────── */ + +.panel { + background: #0e1420; + border: round #1a2a3a; + padding: 1 2; + margin: 0 1 1 1; +} + +.panel-title { + color: #00d4aa; + text-style: bold; + margin-bottom: 1; +} + +/* ── Data Display ──────────────────────────────────── */ + +.value-large { + color: #00d4aa; + text-style: bold; +} + +.value-normal { + color: #c8d0d8; +} + +.label { + color: #506878; +} + +.label-dim { + color: #384858; +} + +/* ── Status Indicators ─────────────────────────────── */ + +.status-ok { + color: #00e060; +} + +.status-warn { + color: #e8a020; +} + +.status-error { + color: #e04040; +} + +.status-demo { + color: #e8a020; + text-style: italic; +} + +/* ── Input Controls ────────────────────────────────── */ + +Input { + background: #121c2a; + border: round #1a3050; + color: #c8d0d8; + padding: 0 1; +} + +Input:focus { + border: round #00d4aa; +} + +Button { + background: #1a2a40; + color: #00d4aa; + border: round #1a3050; + min-width: 10; + height: 3; +} + +Button:hover { + background: #00d4aa; + color: #0a0a12; + border: round #00d4aa; +} + +Button:focus { + border: round #00d4aa; + text-style: bold; +} + +Button.-active { + background: #0a3a3a; +} + +/* ── DataTable ─────────────────────────────────────── */ + +DataTable { + background: #0a0a12; + color: #c8d0d8; + height: 1fr; +} + +DataTable > .datatable--header { + background: #0e1420; + color: #00d4aa; + text-style: bold; +} + +DataTable > .datatable--cursor { + background: #142030; + color: #c8d0d8; +} + +DataTable > .datatable--even-row { + background: #0a0a12; +} + +DataTable > .datatable--odd-row { + background: #0c0e18; +} + +/* ── RichLog ───────────────────────────────────────── */ + +RichLog { + background: #0a0a12; + color: #c8d0d8; + border: round #1a2a3a; + scrollbar-color: #1a2a38; + scrollbar-color-active: #00d4aa; + scrollbar-color-hover: #2a4a58; + height: 1fr; +} + +/* ── Progress Bar ──────────────────────────────────── */ + +ProgressBar { + padding: 0 1; +} + +ProgressBar Bar { + color: #00d4aa; + background: #1a2a38; +} + +ProgressBar PercentageStatus { + color: #506878; + text-style: bold; +} + +/* ── Sparkline ─────────────────────────────────────── */ + +.sparkline { + height: 2; + padding: 0 1; + color: #00d4aa; + background: #0e1420; +} + +.sparkline-label { + color: #506878; + width: 14; +} + +/* ── Compass Rose ──────────────────────────────────── */ + +#compass-container { + height: auto; + min-height: 14; + padding: 1; +} + +.compass-readout { + color: #00d4aa; + text-style: bold; +} + +/* ── Signal Gauge ──────────────────────────────────── */ + +.gauge-container { + height: 3; + padding: 0 1; +} + +.gauge-bar { + height: 1; +} + +.gauge-label { + color: #506878; + width: 8; +} + +.gauge-value { + color: #c8d0d8; + width: 8; + text-align: right; +} + +/* ── Signal Colors (gradient: blue > cyan > green > yellow > red) ── */ + +.signal-cold { + color: #2080d0; +} + +.signal-cool { + color: #00b8c8; +} + +.signal-mid { + color: #00e060; +} + +.signal-warm { + color: #e8c020; +} + +.signal-hot { + color: #e04040; +} + +/* ── Sky Heatmap ───────────────────────────────────── */ + +.heatmap-container { + height: 1fr; + padding: 0; +} + +.heatmap-cell { + width: 2; + height: 1; +} + +/* ── Motor Status ──────────────────────────────────── */ + +.motor-panel { + height: auto; + padding: 1 2; + background: #0e1420; + border: round #1a2a3a; +} + +.motor-row { + layout: horizontal; + height: 1; +} + +.motor-label { + color: #506878; + width: 14; +} + +.motor-value { + color: #c8d0d8; +} + +/* ── Device Status Bar (sidebar bottom) ────────────── */ + +#device-status { + dock: bottom; + height: auto; + padding: 1; + background: #0e1420; + border-top: solid #1a2a38; +} + +.device-status-label { + color: #506878; +} + +.device-status-value { + color: #c8d0d8; +} + +.device-connected { + color: #00e060; +} + +.device-demo { + color: #e8a020; +} + +/* ── Console Screen ────────────────────────────────── */ + +.console-input-area { + dock: bottom; + height: 3; + layout: horizontal; + padding: 0 1; + background: #0e1420; + border-top: solid #1a2a38; +} + +.console-input-area Input { + width: 1fr; +} + +.console-input-area Button { + width: 10; + margin-left: 1; +} + +.console-context { + dock: bottom; + height: 1; + padding: 0 1; + background: #0e1420; + color: #506878; +} + +/* ── Prompt Colors (by submenu) ────────────────────── */ + +.prompt-trk { + color: #00d4aa; +} + +.prompt-mot { + color: #00e060; +} + +.prompt-dvb { + color: #2080d0; +} + +.prompt-nvs { + color: #e8a020; +} + +.prompt-a3981 { + color: #00b8c8; +} + +.prompt-step { + color: #40c0a0; +} + +.prompt-os { + color: #8090a0; +} + +.prompt-other { + color: #506878; +} + +/* ── Scan Screen ───────────────────────────────────── */ + +.scan-controls { + dock: bottom; + height: auto; + padding: 1; + background: #0e1420; + border-top: solid #1a2a38; +} + +.scan-status { + height: 2; + padding: 0 1; + color: #506878; +} + +/* ── Screen-Level Layouts ──────────────────────────── */ + +.screen-container { + layout: vertical; + height: 1fr; + width: 1fr; +} + +.top-row { + layout: horizontal; + height: 1fr; +} + +.bottom-controls { + dock: bottom; + height: auto; + padding: 1; + background: #0e1420; + border-top: solid #1a2a38; + layout: horizontal; +} + +.control-group { + layout: horizontal; + height: 3; + width: 1fr; +} + +.control-group Input { + width: 8; + margin-right: 1; +} + +.control-group Button { + margin-right: 1; +} + +/* ── NVS Table Highlight ───────────────────────────── */ + +.nvs-modified { + color: #e8a020; + text-style: bold; +} + +/* ── Scrollbar Styling ─────────────────────────────── */ + +* { + scrollbar-color: #1a2a38; + scrollbar-color-active: #00d4aa; + scrollbar-color-hover: #2a4a58; + scrollbar-background: #0a0a12; +} diff --git a/tui/src/birdcage_tui/widgets/__init__.py b/tui/src/birdcage_tui/widgets/__init__.py new file mode 100644 index 0000000..ea4216e --- /dev/null +++ b/tui/src/birdcage_tui/widgets/__init__.py @@ -0,0 +1,21 @@ +"""Custom widgets for the Birdcage TUI.""" + +from birdcage_tui.widgets.compass_rose import CompassRose +from birdcage_tui.widgets.device_status_bar import DeviceStatusBar +from birdcage_tui.widgets.motor_status import MotorStatus +from birdcage_tui.widgets.nvs_table import NvsTable +from birdcage_tui.widgets.serial_log import SerialLog +from birdcage_tui.widgets.signal_gauge import SignalGauge +from birdcage_tui.widgets.sky_heatmap import SkyHeatmap +from birdcage_tui.widgets.sparkline_widget import SparklineWidget + +__all__ = [ + "CompassRose", + "DeviceStatusBar", + "MotorStatus", + "NvsTable", + "SerialLog", + "SignalGauge", + "SkyHeatmap", + "SparklineWidget", +] diff --git a/tui/src/birdcage_tui/widgets/compass_rose.py b/tui/src/birdcage_tui/widgets/compass_rose.py new file mode 100644 index 0000000..a6d3c14 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/compass_rose.py @@ -0,0 +1,183 @@ +"""Compass rose widget — visual AZ/EL position display with Unicode compass dial.""" + +from rich.text import Text +from textual.reactive import reactive +from textual.widgets import Static + +# Compass grid layout: 11 columns x 7 rows. +# Positions indexed [row][col] where (0,0) is top-left. +# Cardinal/intercardinal markers are placed at fixed positions. +# The pointer occupies one of 16 perimeter slots based on azimuth. + +# 16-slot perimeter positions (clockwise from N=0): +# Each entry is (row, col) on the 11x7 grid. +_POINTER_SLOTS: list[tuple[int, int]] = [ + (0, 5), # 0: N + (0, 7), # 1: NNE + (1, 9), # 2: NE + (2, 10), # 3: ENE + (3, 10), # 4: E + (4, 10), # 5: ESE + (5, 9), # 6: SE + (6, 7), # 7: SSE + (6, 5), # 8: S + (6, 3), # 9: SSW + (5, 1), # 10: SW + (4, 0), # 11: WSW + (3, 0), # 12: W + (2, 0), # 13: WNW + (1, 1), # 14: NW + (0, 3), # 15: NNW +] + +# Fixed cardinal/intercardinal label positions: (row, col, label) +_LABELS: list[tuple[int, int, str]] = [ + (0, 5, "N"), + (3, 10, "E"), + (6, 5, "S"), + (3, 0, "W"), +] + +# Ring structure characters for the compass dial. +_RING_CHARS: dict[tuple[int, int], str] = { + # Top arc + (0, 3): ".", + (0, 4): "\u2500", + (0, 6): "\u2500", + (0, 7): ".", + # Upper sides + (1, 1): "/", + (1, 9): "\\", + # Mid-upper sides + (2, 0): "\u2502", + (2, 10): "\u2502", + # Center sides (cardinals placed separately) + # (3, 0) and (3, 10) reserved for W/E labels + # Lower-mid sides + (4, 0): "\u2502", + (4, 10): "\u2502", + # Lower sides + (5, 1): "\\", + (5, 9): "/", + # Bottom arc + (6, 3): "'", + (6, 4): "\u2500", + (6, 6): "\u2500", + (6, 7): "'", +} + + +def _azimuth_to_slot(az: float) -> int: + """Map azimuth (0-360, 0=N clockwise) to one of 16 perimeter slots.""" + normalized = az % 360.0 + slot = round(normalized / 22.5) % 16 + return slot + + +class CompassRose(Static): + """Visual compass display showing azimuth/elevation position.""" + + azimuth: reactive[float] = reactive(180.0) + elevation: reactive[float] = reactive(45.0) + + def render(self) -> Text: + result = Text() + + # Large numeric readout + az_label = Text("AZ ", style="#506878 bold") + az_value = Text(f"{self.azimuth:7.2f}\u00b0", style="#00d4aa bold") + el_label = Text(" EL ", style="#506878 bold") + el_value = Text(f"{self.elevation:6.2f}\u00b0", style="#00d4aa bold") + + result.append(az_label) + result.append(az_value) + result.append(el_label) + result.append(el_value) + result.append("\n\n") + + # Build the 7x11 compass grid + grid: list[list[tuple[str, str]]] = [ + [(" ", "#0e1420") for _ in range(11)] for _ in range(7) + ] + + # Place ring structure + for (r, c), ch in _RING_CHARS.items(): + grid[r][c] = (ch, "#c8d0d8") + + # Place cardinal labels + for r, c, label in _LABELS: + grid[r][c] = (label, "#506878 bold") + + # Center crosshair + grid[3][5] = ("\u253c", "#1a2a38") + grid[3][4] = ("\u2500", "#1a2a38") + grid[3][6] = ("\u2500", "#1a2a38") + grid[2][5] = ("\u2502", "#1a2a38") + grid[4][5] = ("\u2502", "#1a2a38") + + # Place pointer at azimuth position + slot = _azimuth_to_slot(self.azimuth) + pr, pc = _POINTER_SLOTS[slot] + # Use a filled diamond for the pointer + grid[pr][pc] = ("\u25c6", "#00d4aa bold") + + # Compute a line from center toward the pointer direction for visual clarity + # Place a dot at an intermediate position between center (3,5) and pointer + cr, cc = 3, 5 + dr = pr - cr + dc = pc - cc + if abs(dr) > 1 or abs(dc) > 1: + mr = cr + (1 if dr > 0 else (-1 if dr < 0 else 0)) + mc = cc + (1 if dc > 0 else (-1 if dc < 0 else 0)) + # Only place intermediate dot if it doesn't overwrite a label + existing_ch = grid[mr][mc][0] + if existing_ch in (" ", "\u2500", "\u2502", "\u253c"): + grid[mr][mc] = ("\u2022", "#00d4aa") + + # Render grid to text + for row_idx, row in enumerate(grid): + for _col_idx, (ch, style) in enumerate(row): + result.append(ch, style=style) + if row_idx < 6: + result.append("\n") + + # Bearing line below compass + bearing = self.azimuth % 360.0 + if bearing < 0: + bearing += 360.0 + cardinal = _bearing_to_cardinal(bearing) + result.append("\n") + result.append(f" {cardinal:>5s}", style="#506878") + result.append(f" {bearing:05.1f}\u00b0", style="#c8d0d8") + + return result + + def watch_azimuth(self, _value: float) -> None: + self.refresh() + + def watch_elevation(self, _value: float) -> None: + self.refresh() + + +def _bearing_to_cardinal(bearing: float) -> str: + """Convert bearing in degrees to 16-point cardinal abbreviation.""" + directions = [ + "N", + "NNE", + "NE", + "ENE", + "E", + "ESE", + "SE", + "SSE", + "S", + "SSW", + "SW", + "WSW", + "W", + "WNW", + "NW", + "NNW", + ] + idx = round(bearing / 22.5) % 16 + return directions[idx] diff --git a/tui/src/birdcage_tui/widgets/device_status_bar.py b/tui/src/birdcage_tui/widgets/device_status_bar.py new file mode 100644 index 0000000..7f72c29 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/device_status_bar.py @@ -0,0 +1,92 @@ +"""Device status bar widget — sidebar display of connection state and firmware info.""" + +from rich.text import Text +from textual.widgets import Static + + +class DeviceStatusBar(Static): + """Sidebar status display showing connection state and firmware info.""" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._connected: bool = False + self._demo: bool = False + self._firmware: str = "---" + self._submenu: str = "---" + self._port: str = "---" + + def set_device(self, device: object) -> None: + """Accept a device reference and update the status display.""" + is_demo = hasattr(device, "demo_mode") or type(device).__name__ == "DemoDevice" + fw = getattr(device, "firmware_id", "02.02.48") if device else "---" + port = getattr(self.app, "serial_port", "/dev/ttyUSB0") if self.app else "---" + submenu = getattr(device, "current_menu", "TRK>") if device else "---" + connected = device is not None and not is_demo + self.update_status( + connected=connected, + demo=is_demo, + firmware=str(fw), + submenu=str(submenu), + port=str(port), + ) + + def update_status( + self, + connected: bool, + demo: bool, + firmware: str, + submenu: str, + port: str, + ) -> None: + """Update all status fields and refresh the display.""" + self._connected = connected + self._demo = demo + self._firmware = firmware + self._submenu = submenu + self._port = port + self.refresh() + + def render(self) -> Text: + result = Text() + label_w = 8 + + # Status row + result.append("Status".ljust(label_w), style="#506878") + if self._connected: + result.append("Connected", style="#00e060 bold") + elif self._demo: + result.append("Demo", style="#e8a020 italic") + else: + result.append("Offline", style="#e04040") + result.append("\n") + + # Port row + result.append("Port".ljust(label_w), style="#506878") + result.append(self._port, style="#c8d0d8") + result.append("\n") + + # Firmware row + result.append("FW".ljust(label_w), style="#506878") + result.append(self._firmware, style="#c8d0d8") + result.append("\n") + + # Menu row + result.append("Menu".ljust(label_w), style="#506878") + # Color the menu prompt with its matching prompt color + submenu_colors: dict[str, str] = { + "TRK>": "#00d4aa", + "MOT>": "#00e060", + "DVB>": "#2080d0", + "NVS>": "#e8a020", + "A3981>": "#00b8c8", + "STEP>": "#40c0a0", + "EE>": "#e8a020", + "OS>": "#8090a0", + "ADC>": "#00b8c8", + "GPIO>": "#40c0a0", + "PEAK>": "#e8c020", + } + menu_color = submenu_colors.get(self._submenu, "#c8d0d8") + result.append(self._submenu, style=f"{menu_color} bold") + + return result diff --git a/tui/src/birdcage_tui/widgets/motor_status.py b/tui/src/birdcage_tui/widgets/motor_status.py new file mode 100644 index 0000000..637622a --- /dev/null +++ b/tui/src/birdcage_tui/widgets/motor_status.py @@ -0,0 +1,76 @@ +"""Motor status widget — engagement state, torque, step counts, and EL range.""" + +from rich.text import Text +from textual.reactive import reactive +from textual.widgets import Static + + +class MotorStatus(Static): + """Panel showing motor engagement state, torque, and step counts.""" + + engaged: reactive[bool] = reactive(False) + az_torque: reactive[str] = reactive("LOW") + el_torque: reactive[str] = reactive("LOW") + az_steps: reactive[int] = reactive(0) + el_steps: reactive[int] = reactive(0) + el_min: reactive[float] = reactive(18.0) + el_max: reactive[float] = reactive(65.0) + + def render(self) -> Text: + result = Text() + label_w = 10 + + # Engaged row + result.append("Engaged".ljust(label_w), style="#506878") + if self.engaged: + result.append("YES", style="#00e060 bold") + else: + result.append("NO", style="#e04040") + result.append("\n") + + # Torque row + result.append("Torque".ljust(label_w), style="#506878") + result.append("AZ: ", style="#506878") + az_style = "#e8c020 bold" if self.az_torque == "HIGH" else "#c8d0d8" + result.append(f"{self.az_torque}", style=az_style) + result.append(" EL: ", style="#506878") + el_style = "#e8c020 bold" if self.el_torque == "HIGH" else "#c8d0d8" + result.append(f"{self.el_torque}", style=el_style) + result.append("\n") + + # Steps row + result.append("Steps".ljust(label_w), style="#506878") + result.append("AZ: ", style="#506878") + result.append(f"{self.az_steps}", style="#c8d0d8") + result.append(" EL: ", style="#506878") + result.append(f"{self.el_steps}", style="#c8d0d8") + result.append("\n") + + # EL Range row + result.append("EL Range".ljust(label_w), style="#506878") + result.append(f"{self.el_min:.1f}\u00b0", style="#c8d0d8") + result.append(" \u2013 ", style="#506878") + result.append(f"{self.el_max:.1f}\u00b0", style="#c8d0d8") + + return result + + def watch_engaged(self, _value: bool) -> None: + self.refresh() + + def watch_az_torque(self, _value: str) -> None: + self.refresh() + + def watch_el_torque(self, _value: str) -> None: + self.refresh() + + def watch_az_steps(self, _value: int) -> None: + self.refresh() + + def watch_el_steps(self, _value: int) -> None: + self.refresh() + + def watch_el_min(self, _value: float) -> None: + self.refresh() + + def watch_el_max(self, _value: float) -> None: + self.refresh() diff --git a/tui/src/birdcage_tui/widgets/nvs_table.py b/tui/src/birdcage_tui/widgets/nvs_table.py new file mode 100644 index 0000000..2fee378 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/nvs_table.py @@ -0,0 +1,93 @@ +"""NVS table widget — DataTable wrapper for non-volatile storage dump display.""" + +import re + +from textual.widgets import DataTable + +# Regex to parse NVS dump lines. +# Examples: +# 0) Log ID's 0x00000007 0x00000007 0x00000007 +# 20) Disable Tracker Proc? TRUE TRUE FALSE +# 101) Minimum Elevation Angle 18.00 18.00 18.00 +_NVS_LINE_RE = re.compile( + r"^\s*(\d+)\)\s+" # index with closing paren + r"(.+?)\s{2,}" # name (greedy until 2+ spaces) + r"(\S+)\s+" # current value + r"(\S+)\s+" # saved value + r"(\S+)\s*$" # default value +) + + +class NvsTable(DataTable): + """DataTable displaying NVS (non-volatile storage) dump data.""" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._parsed_rows: list[dict[str, str]] = [] + self._columns_added = False + + def on_mount(self) -> None: + """Add columns when the widget is mounted.""" + if not self._columns_added: + self.add_columns("Idx", "Name", "Current", "Saved", "Default") + self._columns_added = True + + def load_nvs(self, text: str) -> list[dict[str, str]]: + """Parse NVS dump text and populate the table. + + Returns a list of dicts with keys: idx, name, current, saved, default. + Rows where current != default are marked for the screen to highlight. + """ + self.clear_table() + self._parsed_rows = [] + + for line in text.splitlines(): + line = line.rstrip() + if not line: + continue + + match = _NVS_LINE_RE.match(line) + if not match: + continue + + idx = match.group(1) + name = match.group(2).strip() + current = match.group(3) + saved = match.group(4) + default = match.group(5) + + row_data = { + "idx": idx, + "name": name, + "current": current, + "saved": saved, + "default": default, + } + self._parsed_rows.append(row_data) + + # Add row to the DataTable + modified = current != default + # Prefix the index cell to signal modification to the screen. + # The screen's CSS rule .nvs-modified handles styling. + label = f"*{idx}" if modified else idx + + self.add_row(label, name, current, saved, default, key=f"nvs-{idx}") + + return self._parsed_rows + + def clear_table(self) -> None: + """Remove all rows from the table.""" + self.clear() + self._parsed_rows = [] + + @property + def parsed_rows(self) -> list[dict[str, str]]: + """Access the most recently parsed NVS data.""" + return list(self._parsed_rows) + + @property + def modified_indices(self) -> list[str]: + """Return indices where current value differs from default.""" + return [ + row["idx"] for row in self._parsed_rows if row["current"] != row["default"] + ] diff --git a/tui/src/birdcage_tui/widgets/serial_log.py b/tui/src/birdcage_tui/widgets/serial_log.py new file mode 100644 index 0000000..55ae6c2 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/serial_log.py @@ -0,0 +1,84 @@ +"""Serial log widget — RichLog with color-coded firmware console prompts.""" + +import re + +from rich.text import Text +from textual.widgets import RichLog + +# Prompt patterns and their colors, ordered by specificity (longest match first). +_PROMPT_STYLES: list[tuple[str, str]] = [ + ("A3981>", "#00b8c8"), + ("STEP>", "#40c0a0"), + ("TRK>", "#00d4aa"), + ("MOT>", "#00e060"), + ("DVB>", "#2080d0"), + ("NVS>", "#e8a020"), + ("EE>", "#e8a020"), + ("OS>", "#8090a0"), + ("ADC>", "#00b8c8"), + ("GPIO>", "#40c0a0"), + ("PEAK>", "#e8c020"), + ("LATLON>", "#506878"), + ("DIPSWITCH>", "#506878"), +] + +# Build a regex that matches any known prompt at any position in the text. +_PROMPT_PATTERN = re.compile( + r"(" + "|".join(re.escape(p) for p, _ in _PROMPT_STYLES) + r")" +) + +# Lookup dict for color by prompt string +_PROMPT_COLOR: dict[str, str] = {p: c for p, c in _PROMPT_STYLES} + + +class SerialLog(RichLog): + """RichLog that color-codes Winegard firmware console prompts.""" + + def __init__(self, **kwargs) -> None: + super().__init__(markup=False, wrap=True, **kwargs) + + def append_output(self, text: str) -> None: + """Append firmware output with color-coded prompts. + + Each line is scanned for known prompt strings (TRK>, MOT>, etc.) + which are rendered in their assigned color. All other text uses + the default terminal color. + """ + for line in text.splitlines(): + if not line: + continue + + styled = _colorize_line(line) + self.write(styled) + + def append_command(self, cmd: str) -> None: + """Append a user-issued command, formatted with a prompt indicator.""" + styled = Text() + styled.append("> ", style="#00d4aa bold") + styled.append(cmd, style="#00d4aa") + self.write(styled) + + +def _colorize_line(line: str) -> Text: + """Parse a single line and return a Rich Text with colored prompt spans.""" + result = Text() + last_end = 0 + + for match in _PROMPT_PATTERN.finditer(line): + start, end = match.span() + prompt_str = match.group(1) + color = _PROMPT_COLOR[prompt_str] + + # Text before the prompt + if start > last_end: + result.append(line[last_end:start], style="#c8d0d8") + + # The prompt itself + result.append(prompt_str, style=f"{color} bold") + last_end = end + + # Remaining text after last prompt + if last_end < len(line): + result.append(line[last_end:], style="#c8d0d8") + + return result diff --git a/tui/src/birdcage_tui/widgets/signal_gauge.py b/tui/src/birdcage_tui/widgets/signal_gauge.py new file mode 100644 index 0000000..935ff1c --- /dev/null +++ b/tui/src/birdcage_tui/widgets/signal_gauge.py @@ -0,0 +1,90 @@ +"""Signal gauge widget — horizontal RSSI bar with color-coded thresholds.""" + +from rich.text import Text +from textual.reactive import reactive +from textual.widgets import Static + +# RSSI color thresholds (upper bound, color) +_THRESHOLDS: list[tuple[int, str]] = [ + (500, "#2080d0"), # cold — noise floor + (1000, "#00b8c8"), # cool — weak signal + (2000, "#00e060"), # mid — usable + (3000, "#e8c020"), # warm — strong + (4096, "#e04040"), # hot — saturating +] + +BAR_WIDTH = 40 +MAX_RSSI = 4096 + +# Sub-character bar fragments for smooth rendering (8 levels per cell) +_BAR_CHARS = " ▏▎▍▌▋▊▉" +_FULL = "\u2588" # █ + + +def _rssi_color(rssi: int) -> str: + """Return the color string for a given RSSI value.""" + for threshold, color in _THRESHOLDS: + if rssi <= threshold: + return color + return _THRESHOLDS[-1][1] + + +class SignalGauge(Static): + """Horizontal RSSI signal strength bar gauge.""" + + rssi_avg: reactive[int] = reactive(0) + rssi_cur: reactive[int] = reactive(0) + reads: reactive[int] = reactive(0) + + def render(self) -> Text: + result = Text() + + # Title + result.append("RSSI", style="#506878 bold") + result.append("\n") + + # Compute fill with sub-character precision (8 levels per cell = 320 positions) + clamped = max(0, min(self.rssi_cur, MAX_RSSI)) + fill_frac = clamped / MAX_RSSI * BAR_WIDTH + full_cells = int(fill_frac) + partial = fill_frac - full_cells + partial_idx = int(partial * 8) + + # Build the bar with per-character color based on position thresholds + for i in range(full_cells): + pos_rssi = round((i + 0.5) / BAR_WIDTH * MAX_RSSI) + color = _rssi_color(pos_rssi) + result.append(_FULL, style=color) + + # Partial sub-character cell + remaining = BAR_WIDTH - full_cells + if remaining > 0 and partial_idx > 0: + pos_rssi = round((full_cells + 0.5) / BAR_WIDTH * MAX_RSSI) + color = _rssi_color(pos_rssi) + result.append(_BAR_CHARS[partial_idx], style=color) + remaining -= 1 + + result.append("\u2591" * remaining, style="#1a2a38") + + # Numeric value at end of bar + result.append(f" {self.rssi_cur}", style=_rssi_color(self.rssi_cur)) + result.append("\n") + + # Label line + result.append("avg: ", style="#506878") + result.append(f"{self.rssi_avg}", style="#c8d0d8") + result.append(" cur: ", style="#506878") + result.append(f"{self.rssi_cur}", style="#c8d0d8") + result.append(" reads: ", style="#506878") + result.append(f"{self.reads}", style="#c8d0d8") + + return result + + def watch_rssi_avg(self, _value: int) -> None: + self.refresh() + + def watch_rssi_cur(self, _value: int) -> None: + self.refresh() + + def watch_reads(self, _value: int) -> None: + self.refresh() diff --git a/tui/src/birdcage_tui/widgets/sky_heatmap.py b/tui/src/birdcage_tui/widgets/sky_heatmap.py new file mode 100644 index 0000000..8287920 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/sky_heatmap.py @@ -0,0 +1,123 @@ +"""Sky heatmap widget — 2D AZ x EL grid colored by RSSI for sky scan visualization.""" + +from rich.text import Text +from textual.widgets import Static + +# RSSI color thresholds matching signal_gauge.py +_THRESHOLDS: list[tuple[float, str]] = [ + (500.0, "#2080d0"), + (1000.0, "#00b8c8"), + (2000.0, "#00e060"), + (3000.0, "#e8c020"), + (4096.0, "#e04040"), +] + +_ZERO_COLOR = "#0e1420" + + +def _rssi_color(rssi: float) -> str: + """Return the color string for a given RSSI value.""" + if rssi <= 0: + return _ZERO_COLOR + for threshold, color in _THRESHOLDS: + if rssi <= threshold: + return color + return _THRESHOLDS[-1][1] + + +class SkyHeatmap(Static): + """2D azimuth x elevation grid colored by RSSI for sky scan visualization.""" + + def __init__( + self, + az_bins: int = 40, + el_bins: int = 10, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self._az_bins = az_bins + self._el_bins = el_bins + self._grid: list[list[float]] = [[0.0] * az_bins for _ in range(el_bins)] + self._active_az: int | None = None + self._active_el: int | None = None + + def set_point(self, az_idx: int, el_idx: int, rssi: float) -> None: + """Set RSSI value at a grid cell. Does not refresh — call refresh() explicitly + or batch updates and refresh once.""" + if 0 <= el_idx < self._el_bins and 0 <= az_idx < self._az_bins: + self._grid[el_idx][az_idx] = rssi + + def set_active(self, az_idx: int, el_idx: int) -> None: + """Highlight the current scan position and refresh.""" + self._active_az = az_idx + self._active_el = el_idx + self.refresh() + + def clear(self) -> None: + """Reset all RSSI values to zero and clear active position.""" + for row in self._grid: + for i in range(len(row)): + row[i] = 0.0 + self._active_az = None + self._active_el = None + self.refresh() + + def render(self) -> Text: + result = Text() + + # Column header: AZ labels (every 5 bins) + # Left gutter for EL labels + gutter = 5 + result.append(" " * gutter, style="#0e1420") + for az in range(self._az_bins): + if az % 5 == 0: + label = str(az) + result.append(label, style="#506878") + # Pad to maintain 1-char-per-bin spacing + pad = 1 - len(label) + if pad > 0: + result.append(" " * pad) + else: + result.append(" ") + result.append("\n") + + # Grid rows: highest EL at top + for el_idx in range(self._el_bins - 1, -1, -1): + # EL label + el_label = f"{el_idx:>3d} " + result.append(el_label, style="#506878") + result.append("\u2502", style="#1a2a38") + + for az_idx in range(self._az_bins): + rssi = self._grid[el_idx][az_idx] + is_active = az_idx == self._active_az and el_idx == self._active_el + + if is_active: + # Active scan position: bright white on dark background + result.append("\u2588", style="bold #ffffff on #1a2a38") + elif rssi <= 0: + # Empty cell + result.append("\u2591", style="#0e1420") + else: + color = _rssi_color(rssi) + # Use denser block for higher RSSI + ch = "\u2593" if rssi < 500 else "\u2588" + result.append(ch, style=color) + + if el_idx > 0: + result.append("\n") + + # Bottom border + result.append("\n") + result.append(" " * gutter, style="#0e1420") + result.append("\u2500" * self._az_bins, style="#1a2a38") + + return result + + @property + def az_bins(self) -> int: + return self._az_bins + + @property + def el_bins(self) -> int: + return self._el_bins diff --git a/tui/src/birdcage_tui/widgets/sparkline_widget.py b/tui/src/birdcage_tui/widgets/sparkline_widget.py new file mode 100644 index 0000000..89ad8f5 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/sparkline_widget.py @@ -0,0 +1,74 @@ +"""Sparkline widget — rolling time series using Unicode block characters.""" + +from collections import deque + +from rich.text import Text +from textual.widgets import Static + +# 8-level vertical block characters for sparkline rendering. +# Index 0 = lowest bar, index 7 = tallest bar. +_BLOCKS = "\u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588" + + +class SparklineWidget(Static): + """Rolling sparkline time series display.""" + + def __init__( + self, + max_points: int = 60, + label: str = "", + color: str = "#00d4aa", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self._max_points = max_points + self._label = label + self._color = color + self._buffer: deque[float] = deque(maxlen=max_points) + + def push(self, value: float) -> None: + """Add a data point to the sparkline buffer and refresh.""" + self._buffer.append(value) + self.refresh() + + def render(self) -> Text: + result = Text() + + # Label prefix + if self._label: + result.append(f"{self._label} ", style="#506878") + + if not self._buffer: + result.append("\u2581" * self._max_points, style="#1a2a38") + return result + + values = list(self._buffer) + lo = min(values) + hi = max(values) + span = hi - lo + + for v in values: + if span <= 0: + # All values identical — render as mid-level + idx = 3 + else: + normalized = (v - lo) / span + idx = min(int(normalized * 7.999), 7) + result.append(_BLOCKS[idx], style=self._color) + + # Pad remaining width with low blocks if buffer not full + remaining = self._max_points - len(values) + if remaining > 0: + result.append(_BLOCKS[0] * remaining, style="#1a2a38") + + # Min/max annotation + result.append("\n") + if self._label: + result.append(" " * (len(self._label) + 1)) + result.append(f"{lo:.0f}", style="#506878") + gap = self._max_points - len(f"{lo:.0f}") - len(f"{hi:.0f}") + if gap > 0: + result.append(" " * gap) + result.append(f"{hi:.0f}", style="#506878") + + return result diff --git a/tui/uv.lock b/tui/uv.lock new file mode 100644 index 0000000..7e82e23 --- /dev/null +++ b/tui/uv.lock @@ -0,0 +1,179 @@ +version = 1 +revision = 3 +requires-python = ">=3.11" + +[[package]] +name = "birdcage" +version = "2026.2.12.1" +source = { directory = "../" } +dependencies = [ + { name = "click" }, + { name = "pyserial" }, +] + +[package.metadata] +requires-dist = [ + { name = "click", specifier = ">=8.0" }, + { name = "pyserial", specifier = ">=3.5" }, +] + +[[package]] +name = "birdcage-tui" +version = "2026.2.13" +source = { editable = "." } +dependencies = [ + { name = "birdcage" }, + { name = "textual" }, +] + +[package.metadata] +requires-dist = [ + { name = "birdcage", directory = "../" }, + { name = "textual", specifier = ">=1.0.0" }, +] + +[[package]] +name = "click" +version = "8.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3d/fa/656b739db8587d7b5dfa22e22ed02566950fbfbcdc20311993483657a5c0/click-8.3.1.tar.gz", hash = "sha256:12ff4785d337a1bb490bb7e9c2b1ee5da3112e94a8622f26a6c77f5d2fc6842a", size = 295065, upload-time = "2025-11-15T20:45:42.706Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/98/78/01c019cdb5d6498122777c1a43056ebb3ebfeef2076d9d026bfe15583b2b/click-8.3.1-py3-none-any.whl", hash = "sha256:981153a64e25f12d547d3426c367a4857371575ee7ad18df2a6183ab0545b2a6", size = 108274, upload-time = "2025-11-15T20:45:41.139Z" }, +] + +[[package]] +name = "colorama" +version = "0.4.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, +] + +[[package]] +name = "linkify-it-py" +version = "2.0.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "uc-micro-py" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2a/ae/bb56c6828e4797ba5a4821eec7c43b8bf40f69cda4d4f5f8c8a2810ec96a/linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048", size = 27946, upload-time = "2024-02-04T14:48:04.179Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" }, +] + +[[package]] +name = "markdown-it-py" +version = "4.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "mdurl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, +] + +[package.optional-dependencies] +linkify = [ + { name = "linkify-it-py" }, +] + +[[package]] +name = "mdit-py-plugins" +version = "0.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" }, +] + +[[package]] +name = "mdurl" +version = "0.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, +] + +[[package]] +name = "platformdirs" +version = "4.7.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/71/25/ccd8e88fcd16a4eb6343a8b4b9635e6f3928a7ebcd82822a14d20e3ca29f/platformdirs-4.7.0.tar.gz", hash = "sha256:fd1a5f8599c85d49b9ac7d6e450bc2f1aaf4a23f1fe86d09952fe20ad365cf36", size = 23118, upload-time = "2026-02-12T22:21:53.764Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cb/e3/1eddccb2c39ecfbe09b3add42a04abcc3fa5b468aa4224998ffb8a7e9c8f/platformdirs-4.7.0-py3-none-any.whl", hash = "sha256:1ed8db354e344c5bb6039cd727f096af975194b508e37177719d562b2b540ee6", size = 18983, upload-time = "2026-02-12T22:21:52.237Z" }, +] + +[[package]] +name = "pygments" +version = "2.19.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, +] + +[[package]] +name = "pyserial" +version = "3.5" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6cb66a5b57197850f919f59e558159a4dd3a818f5082/pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb", size = 159125, upload-time = "2020-11-23T03:59:15.045Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/bc/587a445451b253b285629263eb51c2d8e9bcea4fc97826266d186f96f558/pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0", size = 90585, upload-time = "2020-11-23T03:59:13.41Z" }, +] + +[[package]] +name = "rich" +version = "14.3.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/74/99/a4cab2acbb884f80e558b0771e97e21e939c5dfb460f488d19df485e8298/rich-14.3.2.tar.gz", hash = "sha256:e712f11c1a562a11843306f5ed999475f09ac31ffb64281f73ab29ffdda8b3b8", size = 230143, upload-time = "2026-02-01T16:20:47.908Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ef/45/615f5babd880b4bd7d405cc0dc348234c5ffb6ed1ea33e152ede08b2072d/rich-14.3.2-py3-none-any.whl", hash = "sha256:08e67c3e90884651da3239ea668222d19bea7b589149d8014a21c633420dbb69", size = 309963, upload-time = "2026-02-01T16:20:46.078Z" }, +] + +[[package]] +name = "textual" +version = "7.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py", extra = ["linkify"] }, + { name = "mdit-py-plugins" }, + { name = "platformdirs" }, + { name = "pygments" }, + { name = "rich" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9f/38/7d169a765993efde5095c70a668bf4f5831bb7ac099e932f2783e9b71abf/textual-7.5.0.tar.gz", hash = "sha256:c730cba1e3d704e8f1ca915b6a3af01451e3bca380114baacf6abf87e9dac8b6", size = 1592319, upload-time = "2026-01-30T13:46:39.881Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9c/78/96ddb99933e11d91bc6e05edae23d2687e44213066bcbaca338898c73c47/textual-7.5.0-py3-none-any.whl", hash = "sha256:849dfee9d705eab3b2d07b33152b7bd74fb1f5056e002873cc448bce500c6374", size = 718164, upload-time = "2026-01-30T13:46:37.635Z" }, +] + +[[package]] +name = "typing-extensions" +version = "4.15.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, +] + +[[package]] +name = "uc-micro-py" +version = "1.0.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/91/7a/146a99696aee0609e3712f2b44c6274566bc368dfe8375191278045186b8/uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a", size = 6043, upload-time = "2024-02-09T16:52:01.654Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/37/87/1f677586e8ac487e29672e4b17455758fce261de06a0d086167bb760361a/uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5", size = 6229, upload-time = "2024-02-09T16:52:00.371Z" }, +]