Add Birdcage TUI: 5-screen Textual interface for Carryout G2

F1 Position (compass rose, motor control, sparklines),
F2 Signal (RSSI gauge with sub-char precision, DVB/ADC sparklines, LNA toggle),
F3 Scan (AZ/EL grid sweep with heatmap and CSV export),
F4 System (NVS table, A3981 diagnostics, motor dynamics),
F5 Console (raw serial terminal with prompt detection and safety gates).

Includes SerialBridge (thread-safe protocol wrapper), DemoDevice
(synthetic simulation for --demo mode), dark RF theme with rounded
borders and teal accents, and send_raw() on CarryoutG2Protocol.
This commit is contained in:
Ryan Malloy 2026-02-13 08:53:03 -07:00
parent a70b9b0a29
commit 7271b53c63
23 changed files with 4160 additions and 0 deletions

View File

@ -392,6 +392,10 @@ class CarryoutG2Protocol(FirmwareProtocol):
raise ValueError(f"Could not parse RSSI from: {response!r}") raise ValueError(f"Could not parse RSSI from: {response!r}")
def send_raw(self, cmd: str) -> str:
"""Send arbitrary command, return raw prompt-terminated response."""
return self._send(cmd)
def quit_submenu(self) -> None: def quit_submenu(self) -> None:
"""Exit current submenu and return to parent.""" """Exit current submenu and return to parent."""
self._send("q") self._send("q")

31
tui/pyproject.toml Normal file
View File

@ -0,0 +1,31 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "birdcage-tui"
version = "2026.02.13"
description = "Textual TUI for Winegard Carryout G2 satellite dish control"
license = "MIT"
requires-python = ">=3.11"
authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}]
dependencies = [
"birdcage",
"textual>=1.0.0",
]
[project.scripts]
birdcage-tui = "birdcage_tui.app:main"
[tool.uv.sources]
birdcage = { path = ".." }
[tool.ruff]
target-version = "py311"
src = ["src"]
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]
[tool.hatch.build.targets.wheel]
packages = ["src/birdcage_tui"]

View File

@ -0,0 +1 @@
"""Birdcage TUI — Textual interface for Winegard satellite dish control."""

165
tui/src/birdcage_tui/app.py Normal file
View File

@ -0,0 +1,165 @@
"""Birdcage TUI — main application shell.
ContentSwitcher-based layout with sidebar navigation (F1-F5),
device status bar, and five swappable screen panels.
"""
import argparse
import logging
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.widgets import Button, ContentSwitcher, Footer, Header, Static
from birdcage_tui.screens.console import ConsoleScreen
from birdcage_tui.screens.position import PositionScreen
from birdcage_tui.screens.scan import ScanScreen
from birdcage_tui.screens.signal import SignalScreen
from birdcage_tui.screens.system import SystemScreen
from birdcage_tui.widgets.device_status_bar import DeviceStatusBar
log = logging.getLogger(__name__)
MODES: dict[str, tuple[str, type]] = {
"position": ("F1 Position", PositionScreen),
"signal": ("F2 Signal", SignalScreen),
"scan": ("F3 Scan", ScanScreen),
"system": ("F4 System", SystemScreen),
"console": ("F5 Console", ConsoleScreen),
}
class BirdcageApp(App):
"""Textual application for Winegard satellite dish control."""
TITLE = "Birdcage"
CSS_PATH = "theme.tcss"
BINDINGS = [
Binding("f1", "switch_mode('position')", "Position"),
Binding("f2", "switch_mode('signal')", "Signal"),
Binding("f3", "switch_mode('scan')", "Scan"),
Binding("f4", "switch_mode('system')", "System"),
Binding("f5", "switch_mode('console')", "Console"),
Binding("q", "quit", "Quit"),
Binding("d", "toggle_dark", "Dark"),
]
# Set from CLI args before run()
demo_mode: bool = False
serial_port: str = "/dev/ttyUSB0"
firmware_name: str = "g2"
skip_init: bool = False
device: object = None
@property
def SUB_TITLE(self) -> str: # noqa: N802
if self.demo_mode:
return "DEMO"
return self.serial_port
def compose(self) -> ComposeResult:
yield Header()
with Horizontal(id="main-area"):
with Vertical(id="sidebar"):
yield Static("\U0001f6f0\ufe0f Birdcage", classes="sidebar-title")
yield Static("Carryout G2", classes="sidebar-subtitle")
for mode_key, (label, _) in MODES.items():
yield Button(label, id=f"btn-{mode_key}", classes="sidebar-btn")
yield DeviceStatusBar(id="device-status")
with ContentSwitcher(id="content-area", initial="position"):
for mode_key, (_, screen_cls) in MODES.items():
yield screen_cls(id=mode_key)
yield Footer()
def on_mount(self) -> None:
self.query_one("#btn-position").add_class("active")
self._setup_device()
def _setup_device(self) -> None:
"""Create device (demo or real) and hand it to each screen."""
if self.demo_mode:
from birdcage_tui.demo import DemoDevice
self.device = DemoDevice()
self.device.connect()
else:
from birdcage.protocol import get_protocol
from birdcage_tui.bridge import SerialBridge
protocol = get_protocol(self.firmware_name)
self.device = SerialBridge(protocol)
self.device.connect(self.serial_port)
if not self.skip_init:
self.run_worker(self._initialize_device, thread=True)
self._distribute_device()
async def _initialize_device(self) -> None:
"""Run device init in a worker thread (blocks on serial I/O)."""
try:
self.device.initialize()
except Exception:
log.exception("Device initialization failed")
self.notify("Init failed -- check serial connection", severity="error")
def _distribute_device(self) -> None:
"""Pass the device reference to every screen that wants it."""
for mode_key in MODES:
screen = self.query_one(f"#{mode_key}")
if hasattr(screen, "set_device"):
screen.set_device(self.device)
status_bar = self.query_one("#device-status", DeviceStatusBar)
if hasattr(status_bar, "set_device"):
status_bar.set_device(self.device)
def action_switch_mode(self, mode: str) -> None:
"""Switch the content area to *mode* and update sidebar highlight."""
switcher = self.query_one("#content-area", ContentSwitcher)
switcher.current = mode
for btn in self.query(".sidebar-btn"):
btn.remove_class("active")
self.query_one(f"#btn-{mode}").add_class("active")
screen = self.query_one(f"#{mode}")
if hasattr(screen, "on_show"):
screen.on_show()
def action_toggle_dark(self) -> None:
self.dark = not self.dark
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id or ""
if button_id.startswith("btn-"):
mode = button_id.removeprefix("btn-")
if mode in MODES:
self.action_switch_mode(mode)
def main() -> None:
parser = argparse.ArgumentParser(
description="Birdcage TUI -- Satellite Dish Control"
)
parser.add_argument("--demo", action="store_true", help="Run with simulated device")
parser.add_argument("--port", default="/dev/ttyUSB0", help="Serial port")
parser.add_argument(
"--firmware",
default="g2",
choices=["g2", "hal205", "hal000"],
help="Firmware version",
)
parser.add_argument(
"--skip-init", action="store_true", help="Skip firmware initialization"
)
args = parser.parse_args()
app = BirdcageApp()
app.demo_mode = args.demo
app.serial_port = args.port
app.firmware_name = args.firmware
app.skip_init = args.skip_init
app.run()

View File

@ -0,0 +1,439 @@
"""Thread-safe bridge between Birdcage TUI and CarryoutG2Protocol.
Wraps all serial I/O in a threading.Lock so the TUI's worker threads
don't stomp on each other. Tracks the current firmware submenu to
minimize unnecessary q-then-reenter transitions.
"""
import contextlib
import logging
import re
import threading
from enum import Enum, auto
from birdcage.protocol import CarryoutG2Protocol
logger = logging.getLogger(__name__)
class Menu(Enum):
"""Firmware submenu states."""
ROOT = auto()
MOT = auto()
DVB = auto()
NVS = auto()
A3981 = auto()
ADC = auto()
OS = auto()
STEP = auto()
PEAK = auto()
EEPROM = auto()
GPIO = auto()
LATLON = auto()
DIPSWITCH = auto()
UNKNOWN = auto()
# Map Menu enum to the command that enters it from root.
_MENU_COMMANDS: dict[Menu, str] = {
Menu.MOT: "mot",
Menu.DVB: "dvb",
Menu.NVS: "nvs",
Menu.A3981: "a3981",
Menu.ADC: "adc",
Menu.OS: "os",
Menu.STEP: "step",
Menu.PEAK: "peak",
Menu.EEPROM: "eeprom",
Menu.GPIO: "gpio",
Menu.LATLON: "latlon",
Menu.DIPSWITCH: "dipswitch",
}
class SerialBridge:
"""Thread-safe wrapper around CarryoutG2Protocol for TUI consumption.
All public methods acquire a lock before touching the serial port.
The bridge tracks the current firmware submenu so it can skip
redundant quit-and-reenter cycles.
"""
def __init__(self, protocol: CarryoutG2Protocol) -> None:
self._proto = protocol
self._lock = threading.Lock()
self._menu = Menu.UNKNOWN
self._connected = False
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _send(self, cmd: str) -> str:
"""Send a command via the protocol's prompt-terminated send.
Caller must hold ``_lock``.
"""
return self._proto.send_raw(cmd)
def _go_to_root(self) -> None:
"""Return to TRK> root menu. Caller must hold ``_lock``."""
self._proto.reset_to_root()
self._menu = Menu.ROOT
def _ensure_menu(self, target: Menu) -> None:
"""Navigate to *target* submenu if not already there.
Caller must hold ``_lock``.
"""
if self._menu == target:
return
# Always go back to root first — we don't know how to go
# directly between arbitrary submenus.
if self._menu != Menu.ROOT:
self._go_to_root()
if target == Menu.ROOT:
return
cmd = _MENU_COMMANDS.get(target)
if cmd is None:
raise ValueError(f"No entry command for menu {target!r}")
self._send(cmd)
self._menu = target
# ------------------------------------------------------------------
# Connection
# ------------------------------------------------------------------
def connect(self, port: str, baudrate: int = 115200) -> None:
"""Open the RS-422 serial connection."""
with self._lock:
self._proto.connect(port, baudrate)
self._connected = True
self._menu = Menu.UNKNOWN
def disconnect(self) -> None:
"""Close the serial connection."""
with self._lock:
with contextlib.suppress(Exception):
self._go_to_root()
self._proto.disconnect()
self._connected = False
self._menu = Menu.UNKNOWN
@property
def is_connected(self) -> bool:
return self._connected and self._proto.is_connected
def initialize(self, skip_init: bool = False) -> None:
"""Prepare the dish for motor commands.
Args:
skip_init: If True, skip the protocol initialize step
(useful when re-connecting to an already-running dish).
"""
with self._lock:
if not skip_init:
self._proto.initialize()
self._menu = Menu.MOT # initialize() ends in MOT>
# ------------------------------------------------------------------
# Motor (MOT>)
# ------------------------------------------------------------------
def get_position(self) -> dict[str, float]:
"""Query current AZ/EL position.
Returns:
``{"azimuth": float, "elevation": float}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
response = self._send("a")
az_m = re.search(r"Angle\[0\]\s*=\s*(-?\d+\.?\d*)", response)
el_m = re.search(r"Angle\[1\]\s*=\s*(-?\d+\.?\d*)", response)
if not az_m or not el_m:
raise ValueError(f"Could not parse position: {response!r}")
return {
"azimuth": float(az_m.group(1)),
"elevation": float(el_m.group(1)),
}
def move_to(self, az: float, el: float) -> None:
"""Move the dish to an absolute AZ/EL position."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"a 0 {az}")
self._send(f"a 1 {el}")
def move_motor(self, motor_id: int, degrees: float) -> None:
"""Move a single motor to an absolute position."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"a {motor_id} {degrees}")
def home_motor(self, motor_id: int) -> None:
"""Home a motor to its reference position."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"h {motor_id}")
def engage(self) -> None:
"""Engage (energize) the stepper motors."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send("e")
def release(self) -> None:
"""Release (de-energize) the stepper motors."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send("r")
def get_motor_list(self) -> str:
"""List motors and their state."""
with self._lock:
self._ensure_menu(Menu.MOT)
return self._send("l")
def get_motor_dynamics(self) -> dict[str, float]:
"""Read max velocity and acceleration for both axes.
Returns:
``{"az_max_vel": float, "el_max_vel": float,
"az_accel": float, "el_accel": float}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
mv_resp = self._send("mv")
ma_resp = self._send("ma")
result: dict[str, float] = {
"az_max_vel": 0.0,
"el_max_vel": 0.0,
"az_accel": 0.0,
"el_accel": 0.0,
}
# mv → "Max Vel [0] = 65.0 Max Vel [1] = 45.0" or similar
vel_matches = re.findall(r"Max Vel\s*\[(\d)\]\s*=\s*(-?\d+\.?\d*)", mv_resp)
for motor_id, val in vel_matches:
if motor_id == "0":
result["az_max_vel"] = float(val)
elif motor_id == "1":
result["el_max_vel"] = float(val)
# ma → "Accel[0] = 400.0 Accel[1] = 400.0"
acc_matches = re.findall(r"Accel\[(\d)\]\s*=\s*(-?\d+\.?\d*)", ma_resp)
for motor_id, val in acc_matches:
if motor_id == "0":
result["az_accel"] = float(val)
elif motor_id == "1":
result["el_accel"] = float(val)
return result
def get_motor_life(self) -> str:
"""Read motor lifetime / usage statistics."""
with self._lock:
self._ensure_menu(Menu.MOT)
return self._send("life")
def get_el_limits(self) -> dict[str, float]:
"""Read elevation min, max, and home angles.
Firmware returns centidegrees: ``Min: 1800 Max: 6500 Home: 6500``
Returns:
``{"min": 18.0, "max": 65.0, "home": 65.0}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
response = self._send("elminmaxhome")
result: dict[str, float] = {"min": 0.0, "max": 0.0, "home": 0.0}
min_m = re.search(r"Min:\s*(\d+)", response)
max_m = re.search(r"Max:\s*(\d+)", response)
home_m = re.search(r"Home:\s*(\d+)", response)
if min_m:
result["min"] = int(min_m.group(1)) / 100.0
if max_m:
result["max"] = int(max_m.group(1)) / 100.0
if home_m:
result["home"] = int(home_m.group(1)) / 100.0
return result
def get_step_positions(self) -> dict[str, int]:
"""Read raw step positions for both axes.
Firmware returns: ``Position[0] = 19998 Position[1] = 3116``
Returns:
``{"az_steps": int, "el_steps": int}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
response = self._send("p")
result: dict[str, int] = {"az_steps": 0, "el_steps": 0}
matches = re.findall(r"Position\[(\d)\]\s*=\s*(-?\d+)", response)
for motor_id, val in matches:
if motor_id == "0":
result["az_steps"] = int(val)
elif motor_id == "1":
result["el_steps"] = int(val)
return result
# ------------------------------------------------------------------
# Signal (DVB>)
# ------------------------------------------------------------------
def get_rssi(self, iterations: int = 10) -> dict[str, int]:
"""Read averaged RSSI signal strength.
Returns:
``{"reads": int, "average": int, "current": int}``
"""
with self._lock:
self._ensure_menu(Menu.DVB)
response = self._send(f"rssi {iterations}")
match = re.search(
r"Reads:(\d+)\s+RSSI\[avg:\s*(\d+)\s+cur:\s*(\d+)\]",
response,
)
if match:
return {
"reads": int(match.group(1)),
"average": int(match.group(2)),
"current": int(match.group(3)),
}
raise ValueError(f"Could not parse RSSI: {response!r}")
def enable_lna(self) -> None:
"""Enable LNA in ODU mode (sets LNB to 13V)."""
with self._lock:
self._ensure_menu(Menu.DVB)
self._send("lnbdc odu")
def get_lock_status(self) -> str:
"""Read quick lock status (single-shot)."""
with self._lock:
self._ensure_menu(Menu.DVB)
return self._send("qls")
def get_dvb_config(self) -> str:
"""Read BCM hardware/firmware version."""
with self._lock:
self._ensure_menu(Menu.DVB)
return self._send("config")
def get_channel_params(self) -> str:
"""Read current channel parameters."""
with self._lock:
self._ensure_menu(Menu.DVB)
return self._send("dis")
# ------------------------------------------------------------------
# A3981
# ------------------------------------------------------------------
def get_a3981_diag(self) -> str:
"""Read A3981 diagnostic/fault status."""
with self._lock:
self._ensure_menu(Menu.A3981)
return self._send("diag")
def get_a3981_modes(self) -> dict[str, str]:
"""Read A3981 step mode, current mode, and step size.
Returns:
``{"step_mode": str, "current_mode": str, "step_size": str}``
"""
with self._lock:
self._ensure_menu(Menu.A3981)
sm_resp = self._send("sm")
cm_resp = self._send("cm")
ss_resp = self._send("ss")
return {
"step_mode": sm_resp,
"current_mode": cm_resp,
"step_size": ss_resp,
}
def get_a3981_torque(self) -> str:
"""Read A3981 torque levels."""
with self._lock:
self._ensure_menu(Menu.A3981)
return self._send("st")
# ------------------------------------------------------------------
# NVS
# ------------------------------------------------------------------
def nvs_dump(self) -> str:
"""Dump all NVS values."""
with self._lock:
self._ensure_menu(Menu.NVS)
return self._send("d")
def nvs_read(self, index: int) -> str:
"""Read a single NVS value by index."""
with self._lock:
self._ensure_menu(Menu.NVS)
return self._send(f"e {index}")
# ------------------------------------------------------------------
# ADC
# ------------------------------------------------------------------
def get_adc_rssi(self) -> str:
"""Read single-shot ADC RSSI value."""
with self._lock:
self._ensure_menu(Menu.ADC)
return self._send("rssi")
def get_board_id(self) -> str:
"""Read board identification string."""
with self._lock:
self._ensure_menu(Menu.ADC)
return self._send("bdid")
# ------------------------------------------------------------------
# OS
# ------------------------------------------------------------------
def get_firmware_id(self) -> str:
"""Read full MCU and firmware identification."""
with self._lock:
self._ensure_menu(Menu.OS)
return self._send("id")
# ------------------------------------------------------------------
# Raw / Console
# ------------------------------------------------------------------
def send_raw(self, cmd: str) -> str:
"""Send an arbitrary command and return the raw response.
After a raw command, the menu state is marked UNKNOWN because
the user may have navigated to a different submenu.
"""
with self._lock:
response = self._send(cmd)
self._menu = Menu.UNKNOWN
return response

View File

@ -0,0 +1,631 @@
"""Synthetic demo device for the Birdcage TUI.
Drop-in replacement for SerialBridge that simulates a Winegard Carryout G2
dish with motor movement, RSSI signal modeling, and canned firmware responses.
No serial hardware required.
"""
import contextlib
import math
import random
import time
from enum import Enum, auto
class _DemoMenu(Enum):
"""Simulated firmware submenu states."""
ROOT = auto()
MOT = auto()
DVB = auto()
NVS = auto()
A3981 = auto()
ADC = auto()
OS = auto()
STEP = auto()
PEAK = auto()
EEPROM = auto()
GPIO = auto()
LATLON = auto()
DIPSWITCH = auto()
# Complete NVS dump text from firmware 02.02.48 (captured 2026-02-12).
_NVS_DUMP_TEXT = """\
Num Name Current Saved Default
---- -------------------------- ---------- ---------- ----------
0) Log ID's 0x00000007 0x00000007 0x00000007
1) Log Device 0x00000001 0x00000001 0x00000001
2) Debug 2nd Console Port 0 0 0
3) Debug 2nd Packet Port 0 0 0
4) Debug Port Connection 0 0 0
16) Pitch Deadband 0.00 0.00 0.00
17) Roll Deadband 0.00 0.00 0.00
18) Yaw Deadband 0.00 0.00 0.00
20) Disable Tracker Proc? TRUE TRUE FALSE
21) Tracker Proc Run Mode 0 0 0
22) Conical Alpha Az 200 200 200
23) Conical Alpha El 200 200 200
24) Conical Radius 1.00 1.00 1.00
25) Conical Count Max 20 20 20
26) Conical Test Drift +0 +0 +0
27) Circle RPM 120 120 120
28) Circle Pts/Rev 6 6 6
32) Conical Az Clamp 8.00 8.00 8.00
33) Conical El Clamp 8.00 8.00 8.00
35) Motor Pts/Rev 72 72 72
36) Circle Az Radius 1.00 1.00 1.00
37) Circle El Radius 1.00 1.00 1.00
38) Sleep Mode Timer Secs 420 420 420
40) Motor Type 0 0 0
41) Satellite Scan Velocity 55.00 55.00 55.00
48) Motor Spiral Velocity 55.00 55.00 55.00
49) Motor Gear Ratio 0x00000000 0x00000000 0x00000000
63) GPS Heading Threshold 1.00 1.00 1.00
64) GPS Moving Threshold 5.00 MPH 5.00 MPH 5.00 MPH
66) Spiral Signal In A Row Min +3 +3 +3
67) Spiral Signal In A Row Max +20 +20 +20
68) Signal Odd to Even Offset +0 +0 +0
69) Signal Offset 80 80 80
70) Signal Baseline Angle 65.00 65.00 65.00
71) Signal Re-Peak Degrade Percent 25 25 25
72) Gyro Sensitivity +1110 +1110 +1110
73) Gyro Filter Size +1 +1 +1
74) Gyro Calib Readings 100 100 100
75) Gyro Mount Type 1 1 1
76) Gyro Velocity Offset 4 4 4
77) Gyro Max Accel 600 600 600
80) AZ Max Vel 65.00 65.00 65.00
81) AZ Max Accel 400.00 400.00 400.00
82) AZ Home Velocity 55.00 55.00 55.00
83) AZ Steps/Rev 40000 40000 40000
84) AZ Direction +1 +1 +1
85) EL Max Vel 45.00 45.00 45.00
86) EL Max Accel 400.00 400.00 400.00
87) EL Home Velocity 45.00 45.00 45.00
88) EL Steps/Rev 24960 24960 24960
89) EL Direction +1 +1 +1
95) AZ Low current limit 0x0000ff0c 0x0000ff0c 0x0000ff0c
96) AZ High current limit 0x0000ff30 0x0000ff30 0x0000ff30
97) EL Low current limit 0x0000ff0c 0x0000ff0c 0x0000ff0c
98) EL High current limit 0x0000ff40 0x0000ff40 0x0000ff40
101) Minimum Elevation Angle 18.00 18.00 18.00
102) Maximum Elevation Angle 65.00 65.00 65.00
103) Elevation Home Angle 65.00 65.00 65.00
106) Az Stall Detect 78 78 78
107) El Stall Detect 75 75 75
108) Az Stall Samples 100 100 100
109) El Stall Samples 100 100 100
110) EL Home Current Limit 0x0000ff28 0x0000ff28 0x0000ff28
111) AZ Home Current Limit 0x0000ff40 0x0000ff40 0x0000ff40
112) Disable Dipswitch? FALSE FALSE FALSE
113) Dipswitch Value 101 101 101
114) Dipswitch Front/Rear Mount 0 0 0
115) Mount Offset Angle +0 +0 +0
118) Signal Use LNB Clamp FALSE FALSE FALSE
128) AZ PID Kp +600 +600 +600
129) AZ PID Kv +60 +60 +60
130) AZ PID Ki +1 +1 +1
131) EL PID Kp +250 +250 +250
132) EL PID Kv +50 +50 +50
133) EL PID Ki +1 +1 +1
136) AZ PWM Stall Cnt 6 6 6
137) EL PWM Stall Cnt 5 5 5
143) Tracking Number 0 0 0"""
# Parse NVS lines into a dict keyed by index for nvs_read().
_NVS_LINES: dict[int, str] = {}
for _line in _NVS_DUMP_TEXT.splitlines():
_line_stripped = _line.strip()
if _line_stripped and _line_stripped[0].isdigit():
_idx_str = _line_stripped.split(")")[0].strip()
with contextlib.suppress(ValueError):
_NVS_LINES[int(_idx_str)] = _line_stripped
# Firmware identification text matching ``os > id`` output.
_FIRMWARE_ID = """\
NVS Version: 1.02.13
System ID: TWELINCH
K60-144pin
Silicon Rev 2.4
Mask Set 4N22D
512 kBytes of P-flash
P-flash only
128 kBytes of RAM
Board Rev ID: A
Board ID: STATIONARY
Ant ID: 12-IN G2
Software version: 02.02.48
CCLK: 96000000
BCLK: 48000000
Flash Base Address: 65536
Flash Size: 458752"""
_DVB_CONFIG = """\
BCM Hardware= ID: 0x4515 VER: 0xB0
BCM Firmware= MAJOR VER: 0x71 (113) MINOR VER: 0x25 (37)
BCM Strap Config: 0x25018"""
_CHANNEL_PARAMS = """\
Power Mode: ON
Search Transponders: ON
Auto Search Mode: 1
Shuffle Mode: ON
Frequency List: Non-Stacked
Num Parameter Current Default
1 Frequency 1090640 (kHz) 974000 (kHz)
2 Symbol Rate 0 (PeakScanEnabled) 20000 (ksps)
3 Trans_Mod_CRate blind_scan blind_scan
4 Blind Scan Mode ___trb_dvb_dss_____ ___trb_dvb_dss_____
5 LNB Polarity ODU:13V ---
6 LNB Tone (ODU) off off
7 Roll-off 0.35 0.35
8 LPF Cutoff 0 (auto) 0 (MHz)
9 Carrier Offset 0 (kHz) 0 (kHz)
10 FreqSearchRange 5000 (kHz) 5000 (kHz)
11 DCII Mode dcii_qpsk_comb dcii_qpsk_comb
12 Spectral Inv scan scan
13 PScnSymRtRngMin 18000 (ksps) 18000 (ksps)
14 PScnSymRtRngMax 24000 (ksps) 24000 (ksps)
15 SignalDetectMode off off"""
_MOTOR_LIFE = """\
AZ total moves: 847
AZ total degrees: 52340.50
EL total moves: 423
EL total degrees: 18920.75
Uptime hours: 312.4"""
# Simulated satellite at AZ=200, EL=38 for RSSI modeling.
_SAT_AZ = 200.0
_SAT_EL = 38.0
_RSSI_NOISE_FLOOR = 500
_RSSI_PEAK = 2000
_RSSI_BEAM_WIDTH = 50.0 # Gaussian denominator (degrees squared)
# Motor simulation speed (degrees per second).
_MOTOR_SPEED = 10.0
class DemoDevice:
"""Synthetic demo device implementing the same interface as SerialBridge.
Simulates a Carryout G2 dish with motor movement, RSSI signal modeling,
and canned firmware responses. No serial hardware required.
"""
def __init__(self) -> None:
self._connected = False
self._engaged = True
# Current position and movement targets.
self._az = 180.0
self._el = 45.0
self._target_az = 180.0
self._target_el = 45.0
self._last_move_time = time.monotonic()
# Submenu tracking for console simulation.
self._menu = _DemoMenu.ROOT
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _update_position(self) -> None:
"""Interpolate position toward target at ~10 deg/s."""
now = time.monotonic()
dt = now - self._last_move_time
self._last_move_time = now
max_step = _MOTOR_SPEED * dt
for axis in ("az", "el"):
current = getattr(self, f"_{axis}")
target = getattr(self, f"_target_{axis}")
delta = target - current
if abs(delta) < 0.001:
continue
if abs(delta) <= max_step:
# Arrived — add a tiny settling noise.
noise = random.gauss(0.0, 0.02)
setattr(self, f"_{axis}", target + noise)
else:
direction = 1.0 if delta > 0 else -1.0
noise = random.gauss(0.0, 0.02)
setattr(self, f"_{axis}", current + direction * max_step + noise)
def _compute_rssi(self) -> float:
"""Gaussian signal model centered on the simulated satellite."""
self._update_position()
dist_sq = (self._az - _SAT_AZ) ** 2 + (self._el - _SAT_EL) ** 2
signal = _RSSI_PEAK * math.exp(-dist_sq / _RSSI_BEAM_WIDTH)
drift = math.sin(time.monotonic() / 60.0) * 50.0
return _RSSI_NOISE_FLOOR + signal + drift
@property
def _is_moving(self) -> bool:
return (
abs(self._az - self._target_az) > 0.05
or abs(self._el - self._target_el) > 0.05
)
# ------------------------------------------------------------------
# Connection
# ------------------------------------------------------------------
def connect(self, port: str = "/dev/demo", baudrate: int = 115200) -> None:
self._connected = True
self._menu = _DemoMenu.ROOT
def disconnect(self) -> None:
self._connected = False
self._menu = _DemoMenu.ROOT
@property
def is_connected(self) -> bool:
return self._connected
def initialize(self, skip_init: bool = False) -> None:
self._connected = True
self._menu = _DemoMenu.MOT
# ------------------------------------------------------------------
# Motor (MOT>)
# ------------------------------------------------------------------
def get_position(self) -> dict[str, float]:
self._update_position()
return {
"azimuth": round(self._az, 2),
"elevation": round(self._el, 2),
}
def move_to(self, az: float, el: float) -> None:
self._target_az = az
self._target_el = el
self._last_move_time = time.monotonic()
def move_motor(self, motor_id: int, degrees: float) -> None:
if motor_id == 0:
self._target_az = degrees
elif motor_id == 1:
self._target_el = degrees
self._last_move_time = time.monotonic()
def home_motor(self, motor_id: int) -> None:
if motor_id == 0:
self._target_az = 0.0
elif motor_id == 1:
self._target_el = 65.0
self._last_move_time = time.monotonic()
def engage(self) -> None:
self._engaged = True
def release(self) -> None:
self._engaged = False
def get_motor_list(self) -> str:
return "Motors:\n 0 - AZIMUTH: local\n 1 - ELEVATION: local"
def get_motor_dynamics(self) -> dict[str, float]:
return {
"az_max_vel": 65.0,
"el_max_vel": 45.0,
"az_accel": 400.0,
"el_accel": 400.0,
}
def get_motor_life(self) -> str:
return _MOTOR_LIFE
def get_el_limits(self) -> dict[str, float]:
return {"min": 18.0, "max": 65.0, "home": 65.0}
def get_step_positions(self) -> dict[str, int]:
self._update_position()
return {
"az_steps": int(self._az * 40000 / 360),
"el_steps": int(self._el * 24960 / 360),
}
# ------------------------------------------------------------------
# Signal (DVB>)
# ------------------------------------------------------------------
def get_rssi(self, iterations: int = 10) -> dict[str, int]:
rssi = self._compute_rssi()
noise = random.gauss(0.0, 30.0)
return {
"reads": iterations,
"average": int(rssi),
"current": int(rssi + noise),
}
def enable_lna(self) -> None:
pass # No-op in demo mode.
def get_lock_status(self) -> str:
rssi = int(self._compute_rssi())
locked = 1 if rssi > 1500 else 0
return f"Lock:{locked} rssi:{rssi} cnt:0"
def get_dvb_config(self) -> str:
return _DVB_CONFIG
def get_channel_params(self) -> str:
return _CHANNEL_PARAMS
# ------------------------------------------------------------------
# A3981
# ------------------------------------------------------------------
def get_a3981_diag(self) -> str:
return "AZ DIAG: OK\nEL DIAG: OK"
def get_a3981_modes(self) -> dict[str, str]:
return {
"step_mode": "AZ Step Size Mode = AUTO\nEL Step Size Mode = AUTO",
"current_mode": "AZ: Mode = AUTO\nEL: Mode = AUTO",
"step_size": (
"KEY: FULL-16, HALF-8, QTR-4, EIGHTH-2, SIXTEENTH-1\n"
"AZ Step Size:1\n"
"EL Step Size:1"
),
}
def get_a3981_torque(self) -> str:
if self._is_moving:
return "AZ Torq:HIGH\nEL Torq:HIGH"
return "AZ Torq:LOW\nEL Torq:LOW"
# ------------------------------------------------------------------
# NVS
# ------------------------------------------------------------------
def nvs_dump(self) -> str:
return _NVS_DUMP_TEXT
def nvs_read(self, index: int) -> str:
line = _NVS_LINES.get(index)
if line:
return line
return f"NVS index {index} not found"
# ------------------------------------------------------------------
# ADC
# ------------------------------------------------------------------
def get_adc_rssi(self) -> str:
rssi = self._compute_rssi()
return str(int(rssi))
def get_board_id(self) -> str:
return "STATIONARY"
# ------------------------------------------------------------------
# OS
# ------------------------------------------------------------------
def get_firmware_id(self) -> str:
return _FIRMWARE_ID
# ------------------------------------------------------------------
# Raw / Console
# ------------------------------------------------------------------
def send_raw(self, cmd: str) -> str:
"""Simulate firmware console with basic submenu tracking."""
cmd_stripped = cmd.strip().lower()
# Submenu navigation.
if cmd_stripped == "q":
self._menu = _DemoMenu.ROOT
return "TRK>"
_enter_map: dict[str, _DemoMenu] = {
"mot": _DemoMenu.MOT,
"dvb": _DemoMenu.DVB,
"nvs": _DemoMenu.NVS,
"a3981": _DemoMenu.A3981,
"adc": _DemoMenu.ADC,
"os": _DemoMenu.OS,
"step": _DemoMenu.STEP,
"peak": _DemoMenu.PEAK,
"eeprom": _DemoMenu.EEPROM,
"gpio": _DemoMenu.GPIO,
"latlon": _DemoMenu.LATLON,
"dipswitch": _DemoMenu.DIPSWITCH,
}
if cmd_stripped in _enter_map and self._menu == _DemoMenu.ROOT:
self._menu = _enter_map[cmd_stripped]
prompt = cmd_stripped.upper() + ">"
return prompt
# Context-dependent responses.
if self._menu == _DemoMenu.MOT:
return self._handle_mot(cmd_stripped)
if self._menu == _DemoMenu.DVB:
return self._handle_dvb(cmd_stripped)
if self._menu == _DemoMenu.NVS:
return self._handle_nvs(cmd_stripped)
if self._menu == _DemoMenu.A3981:
return self._handle_a3981(cmd_stripped)
if self._menu == _DemoMenu.ADC:
return self._handle_adc(cmd_stripped)
if self._menu == _DemoMenu.OS:
return self._handle_os(cmd_stripped)
if self._menu == _DemoMenu.ROOT:
return self._handle_root(cmd_stripped)
return f"Unknown command: {cmd}\nTRK>"
def _handle_root(self, cmd: str) -> str:
if cmd in ("?", "help"):
return (
"Available commands:\n"
" a3981 adc dipswitch dvb eeprom gpio\n"
" latlon mot nvs os peak step\n"
" q reboot stow\n"
"TRK>"
)
if cmd == "reboot":
return "Rebooting...\nApplication Starting Kinetis PCB...\nTRK>"
return f"Unknown command: {cmd}\nTRK>"
def _handle_mot(self, cmd: str) -> str:
if cmd in ("?", "help"):
return (
"Available commands:\n"
" a azscan azscanwxp e ela2s elminmaxhome\n"
" els2a g h l life ma motorboth motorlife\n"
" mv p pid r sd sp sw v vms w\n"
"MOT>"
)
if cmd == "a":
self._update_position()
return f" Angle[0] = {self._az:.2f}\n Angle[1] = {self._el:.2f}\nMOT>"
if cmd == "l":
return "Motors:\n 0 - AZIMUTH: local\n 1 - ELEVATION: local\nMOT>"
if cmd == "e":
self._engaged = True
return "Motors engaged\nMOT>"
if cmd == "r":
self._engaged = False
return "Motors released\nMOT>"
if cmd == "elminmaxhome":
return "Min: 1800 Max: 6500 Home: 6500\nMOT>"
if cmd == "life":
return _MOTOR_LIFE + "\nMOT>"
if cmd.startswith("a "):
parts = cmd.split()
if len(parts) >= 3:
motor_id = int(parts[1])
degrees = float(parts[2])
if motor_id == 0:
self._target_az = degrees
elif motor_id == 1:
self._target_el = degrees
self._last_move_time = time.monotonic()
return f" Angle = {degrees:.2f}\nMOT>"
return "Invalid parameters\nMOT>"
if cmd.startswith("h "):
parts = cmd.split()
if len(parts) >= 2:
motor_id = int(parts[1])
self.home_motor(motor_id)
return f"Homing motor {motor_id}\nMOT>"
return "Invalid parameters\nMOT>"
if cmd == "mv":
return "Max Vel [0] = 65.0 Max Vel [1] = 45.0\nMOT>"
if cmd == "ma":
return "Accel[0] = 400.0 Accel[1] = 400.0\nMOT>"
if cmd == "p":
self._update_position()
az_steps = int(self._az * 40000 / 360)
el_steps = int(self._el * 24960 / 360)
return f"Position[0] = {az_steps} Position[1] = {el_steps}\nMOT>"
return f"Unknown command: {cmd}\nMOT>"
def _handle_dvb(self, cmd: str) -> str:
if cmd in ("?", "help"):
return (
"Available commands:\n"
" agc config def diag dis e freqs\n"
" lnbdc lnbv ls man msw nid pwr\n"
" qls range rssi shuf snr srch srch_mode\n"
" stats t table tablex tabto to\n"
"DVB>"
)
if cmd.startswith("rssi"):
rssi_val = int(self._compute_rssi())
parts = cmd.split()
iters = int(parts[1]) if len(parts) > 1 else 10
noise = random.gauss(0.0, 30.0)
cur = int(rssi_val + noise)
return (
f"iterations:{iters} interval(msec):20\n"
f" Reads:{iters} RSSI[avg: {rssi_val} cur: {cur}]\n"
"DVB>"
)
if cmd == "config":
return _DVB_CONFIG + "\nDVB>"
if cmd == "dis":
return _CHANNEL_PARAMS + "\nDVB>"
if cmd == "lnbdc odu":
return "Enabled LNB ODU 13V\nDVB>"
if cmd == "qls":
rssi_val = int(self._compute_rssi())
locked = 1 if rssi_val > 1500 else 0
return f"Lock:{locked} rssi:{rssi_val} cnt:0\nDVB>"
return f"Unknown command: {cmd}\nDVB>"
def _handle_nvs(self, cmd: str) -> str:
if cmd in ("?", "help"):
return "Available commands:\n d e s\nNVS>"
if cmd == "d":
return _NVS_DUMP_TEXT + "\nNVS>"
if cmd == "s":
return "NVS saved\nNVS>"
if cmd.startswith("e "):
parts = cmd.split()
if len(parts) >= 2:
try:
idx = int(parts[1])
line = _NVS_LINES.get(idx)
if line:
return line + "\nNVS>"
return f"NVS index {idx} not found\nNVS>"
except ValueError:
pass
return "Invalid parameters\nNVS>"
return f"Unknown command: {cmd}\nNVS>"
def _handle_a3981(self, cmd: str) -> str:
if cmd in ("?", "help"):
return "Available commands:\n cm diag reset sm ss st\nA3981>"
if cmd == "diag":
return "AZ DIAG: OK\nEL DIAG: OK\nA3981>"
if cmd == "sm":
return "AZ Step Size Mode = AUTO\nEL Step Size Mode = AUTO\nA3981>"
if cmd == "cm":
return "AZ: Mode = AUTO\nEL: Mode = AUTO\nA3981>"
if cmd == "ss":
return (
"KEY: FULL-16, HALF-8, QTR-4, EIGHTH-2, SIXTEENTH-1\n"
"AZ Step Size:1\n"
"EL Step Size:1\n"
"A3981>"
)
if cmd == "st":
if self._is_moving:
return "AZ Torq:HIGH\nEL Torq:HIGH\nA3981>"
return "AZ Torq:LOW\nEL Torq:LOW\nA3981>"
if cmd == "reset":
return "Az/El A3981 Faults Reset.\nA3981>"
return f"Unknown command: {cmd}\nA3981>"
def _handle_adc(self, cmd: str) -> str:
if cmd in ("?", "help"):
return "Available commands:\n bdid bdrevid m rssi scan\nADC>"
if cmd == "rssi":
return str(int(self._compute_rssi())) + "\nADC>"
if cmd == "bdid":
return "STATIONARY\nADC>"
if cmd == "bdrevid":
return "A\nADC>"
return f"Unknown command: {cmd}\nADC>"
def _handle_os(self, cmd: str) -> str:
if cmd in ("?", "help"):
return "Available commands:\n id reboot\nOS>"
if cmd == "id":
return _FIRMWARE_ID + "\nOS>"
if cmd == "reboot":
return "Rebooting...\nApplication Starting Kinetis PCB...\nTRK>"
return f"Unknown command: {cmd}\nOS>"

View File

@ -0,0 +1 @@
"""TUI screen modules — one per F-key mode."""

View File

@ -0,0 +1,232 @@
"""F5 Console Screen -- raw serial terminal with color-coded prompts
and command history."""
import re
from textual import work
from textual.app import ComposeResult
from textual.containers import Container, Horizontal
from textual.events import Key
from textual.widgets import Button, Input, Static
from birdcage_tui.widgets.serial_log import SerialLog
_KNOWN_PROMPTS = [
"TRK>",
"MOT>",
"DVB>",
"NVS>",
"A3981>",
"ADC>",
"OS>",
"STEP>",
"PEAK>",
"EE>",
"GPIO>",
"LATLON>",
"DIPSWITCH>",
]
# Pattern to detect NVS write commands: "nvs" ... "e <idx> <value>"
# or just "e <idx> <value>" when already in the NVS submenu.
_NVS_WRITE_RE = re.compile(r"e\s+\d+\s+\S+")
def _detect_prompt(text: str) -> str | None:
"""Find the last known prompt in the response text."""
last_prompt = None
last_pos = -1
for prompt in _KNOWN_PROMPTS:
pos = text.rfind(prompt)
if pos > last_pos:
last_pos = pos
last_prompt = prompt
return last_prompt
class ConsoleScreen(Container):
"""F5: Raw serial console for direct firmware interaction."""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._device: object | None = None
self._command_history: list[str] = []
self._history_idx: int = 0
self._cmd_count: int = 0
self._prompt_ctx: str = "TRK>"
self._last_dangerous_cmd: str | None = None
def compose(self) -> ComposeResult:
with Container(classes="screen-container"):
yield SerialLog(id="serial-log")
with Horizontal(classes="console-context"):
yield Static("Context: TRK>", id="console-context")
yield Static(" Commands: 0", id="console-cmd-count")
with Horizontal(classes="console-input-area"):
yield Static("> ", classes="label")
yield Input(placeholder="Enter command...", id="console-input")
yield Button("Send", id="btn-send", variant="primary")
def set_device(self, device: object) -> None:
"""Store the device reference and show a welcome message."""
self._device = device
serial_log = self.query_one("#serial-log", SerialLog)
# Determine connection description.
if hasattr(device, "demo_mode") or type(device).__name__ == "DemoDevice":
mode_label = "DEMO"
else:
mode_label = getattr(device, "firmware_name", "Live")
port = getattr(self.app, "serial_port", "/dev/ttyUSB0") if self.app else "---"
serial_log.append_output("Birdcage Console -- type ? for help")
serial_log.append_output(f"Connected to: {mode_label} / {port}")
def _check_dangerous(self, cmd: str) -> str | None:
"""Return a warning message if the command is dangerous, or None if safe.
If the same dangerous command is sent twice in a row, allow it through
(the user is insisting).
"""
stripped = cmd.strip()
lower = stripped.lower()
# Same dangerous command repeated -- user is insisting.
is_repeat = (
self._last_dangerous_cmd is not None
and stripped == self._last_dangerous_cmd
)
if is_repeat:
self._last_dangerous_cmd = None
return None
warning = None
if lower == "q" and self._prompt_ctx == "TRK>":
warning = (
"Warning: 'q' at root kills the shell! "
"Use submenu-level 'q' to exit submenus."
)
elif lower == "reboot":
warning = "Warning: 'reboot' will restart the dish firmware."
elif _NVS_WRITE_RE.search(lower):
warning = "Warning: NVS write detected. Are you sure?"
if warning is not None:
self._last_dangerous_cmd = stripped
else:
self._last_dangerous_cmd = None
return warning
def _do_send(self, cmd_text: str) -> None:
"""Validate and dispatch a command. Called on Enter or Send button."""
cmd_text = cmd_text.strip()
if not cmd_text:
return
# Safety gate.
warning = self._check_dangerous(cmd_text)
if warning is not None:
self.notify(warning, severity="warning", timeout=5)
return
# Record in history.
self._command_history.append(cmd_text)
self._history_idx = len(self._command_history)
# Show the command in the log immediately.
serial_log = self.query_one("#serial-log", SerialLog)
serial_log.append_command(cmd_text)
# Clear input right away so the user can type while waiting.
self.query_one("#console-input", Input).value = ""
# Dispatch to worker thread (serial I/O blocks).
self._send_command(cmd_text)
@work(thread=True)
def _send_command(self, cmd: str) -> None:
"""Send the command over serial and update the UI with the response."""
if self._device is None:
self.app.call_from_thread(
self.notify, "No device connected", severity="error"
)
return
try:
response = self._device.send_raw(cmd)
except Exception as exc:
self.app.call_from_thread(self._on_response, f"ERROR: {exc}", cmd)
return
self.app.call_from_thread(self._on_response, response, cmd)
def _on_response(self, response: str, cmd: str) -> None:
"""Process the firmware response on the main thread."""
serial_log = self.query_one("#serial-log", SerialLog)
serial_log.append_output(response)
# Detect prompt context from the response.
detected = _detect_prompt(response)
if detected is not None:
self._prompt_ctx = detected
ctx_label = self.query_one("#console-context", Static)
ctx_label.update(f"Context: {self._prompt_ctx}")
# Update command count.
self._cmd_count += 1
count_label = self.query_one("#console-cmd-count", Static)
count_label.update(f" Commands: {self._cmd_count}")
# ------------------------------------------------------------------
# Event handlers
# ------------------------------------------------------------------
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle Enter key in the command input."""
if event.input.id == "console-input":
self._do_send(event.value)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle the Send button click."""
if event.button.id == "btn-send":
cmd_input = self.query_one("#console-input", Input)
self._do_send(cmd_input.value)
def on_key(self, event: Key) -> None:
"""Handle up/down arrow keys for command history navigation."""
cmd_input = self.query_one("#console-input", Input)
# Only respond when the input widget has focus.
if not cmd_input.has_focus:
return
if event.key == "up":
event.prevent_default()
event.stop()
if not self._command_history:
return
self._history_idx = max(0, self._history_idx - 1)
cmd_input.value = self._command_history[self._history_idx]
cmd_input.cursor_position = len(cmd_input.value)
elif event.key == "down":
event.prevent_default()
event.stop()
if not self._command_history:
return
self._history_idx = min(len(self._command_history), self._history_idx + 1)
if self._history_idx >= len(self._command_history):
cmd_input.value = ""
else:
cmd_input.value = self._command_history[self._history_idx]
cmd_input.cursor_position = len(cmd_input.value)
elif event.key == "ctrl+l":
event.prevent_default()
event.stop()
serial_log = self.query_one("#serial-log", SerialLog)
serial_log.clear()

View File

@ -0,0 +1,282 @@
"""F1 Position screen -- AZ/EL display, manual moves, homing, engage/release.
Widget container for ContentSwitcher. Polls the device at 2 Hz for
position and step data, drives the compass rose, motor status panel,
and AZ/EL sparklines. Bottom row provides manual move controls.
"""
import logging
import time
from textual import work
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Button, Input, Static
from textual.worker import Worker
from birdcage_tui.widgets.compass_rose import CompassRose
from birdcage_tui.widgets.motor_status import MotorStatus
from birdcage_tui.widgets.sparkline_widget import SparklineWidget
log = logging.getLogger(__name__)
class PositionScreen(Container):
"""F1: Position control and monitoring."""
can_focus = True
BINDINGS = [
Binding("left", "nudge_az(-1)", "AZ -1", show=False),
Binding("right", "nudge_az(1)", "AZ +1", show=False),
Binding("up", "nudge_el(1)", "EL +1", show=False),
Binding("down", "nudge_el(-1)", "EL -1", show=False),
Binding("h", "home_both", "Home Both", show=False),
Binding("e", "engage_motors", "Engage", show=False),
Binding("r", "release_motors", "Release", show=False),
]
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._device: object = None
self._polling = False
self._engaged = False
self._poll_worker: Worker | None = None
# Track last-known position for nudge commands.
self._last_az = 180.0
self._last_el = 45.0
def compose(self) -> ComposeResult:
with Container(classes="screen-container"):
with Horizontal(classes="top-row"):
yield CompassRose(id="compass")
with Vertical(classes="panel"):
yield Static("Motor Status", classes="panel-title")
yield MotorStatus(id="motor-status")
with Vertical():
yield SparklineWidget(
max_points=80, label="AZ", color="#00d4aa", id="az-spark"
)
yield SparklineWidget(
max_points=80, label="EL", color="#00b8c8", id="el-spark"
)
with Horizontal(classes="bottom-controls"):
yield Static("AZ ", classes="label")
yield Input(placeholder="180.0", id="az-input", type="number")
yield Static(" EL ", classes="label")
yield Input(placeholder="45.0", id="el-input", type="number")
yield Button("Move", id="btn-move", variant="primary")
yield Button("Home AZ", id="btn-home-az")
yield Button("Home EL", id="btn-home-el")
yield Button("E/R", id="btn-engage")
# ------------------------------------------------------------------
# Device lifecycle
# ------------------------------------------------------------------
def set_device(self, device: object) -> None:
"""Store the device reference and start position polling."""
self._device = device
self._polling = True
self._poll_worker = self._do_position_poll()
def on_show(self) -> None:
"""Resume polling when this screen becomes visible."""
if self._device is not None and not self._polling:
self._polling = True
self._poll_worker = self._do_position_poll()
# ------------------------------------------------------------------
# Position poll worker
# ------------------------------------------------------------------
@work(thread=True, exclusive=True, group="position-poll")
def _do_position_poll(self) -> None:
"""Poll device at ~2 Hz for position and step data."""
while self._polling:
if self._device is None:
time.sleep(0.5)
continue
try:
pos = self._device.get_position()
az = pos["azimuth"]
el = pos["elevation"]
self._last_az = az
self._last_el = el
self.app.call_from_thread(self._update_compass, az, el)
self.app.call_from_thread(self._push_sparklines, az, el)
except Exception:
log.debug("Position poll failed", exc_info=True)
try:
steps = self._device.get_step_positions()
self.app.call_from_thread(
self._update_motor_steps,
steps["az_steps"],
steps["el_steps"],
)
except Exception:
log.debug("Step position poll failed", exc_info=True)
# Poll torque state from A3981
try:
torque_resp = self._device.get_a3981_torque()
lines = torque_resp.split("\n")
az_torque = "HIGH" if "HIGH" in lines[0] else "LOW"
el_torque = "HIGH" if len(lines) > 1 and "HIGH" in lines[1] else "LOW"
self.app.call_from_thread(self._update_torque, az_torque, el_torque)
except Exception:
log.debug("Torque poll failed", exc_info=True)
time.sleep(0.5)
# ------------------------------------------------------------------
# Thread-safe widget update callbacks
# ------------------------------------------------------------------
def _update_compass(self, az: float, el: float) -> None:
compass = self.query_one("#compass", CompassRose)
compass.azimuth = az
compass.elevation = el
def _push_sparklines(self, az: float, el: float) -> None:
self.query_one("#az-spark", SparklineWidget).push(az)
self.query_one("#el-spark", SparklineWidget).push(el)
def _update_motor_steps(self, az_steps: int, el_steps: int) -> None:
motor = self.query_one("#motor-status", MotorStatus)
motor.az_steps = az_steps
motor.el_steps = el_steps
def _update_torque(self, az_torque: str, el_torque: str) -> None:
motor = self.query_one("#motor-status", MotorStatus)
motor.az_torque = az_torque
motor.el_torque = el_torque
def _update_engaged(self, engaged: bool) -> None:
motor = self.query_one("#motor-status", MotorStatus)
motor.engaged = engaged
# ------------------------------------------------------------------
# Button handlers
# ------------------------------------------------------------------
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id or ""
if button_id == "btn-move":
self._handle_move()
elif button_id == "btn-home-az":
self._handle_home(0)
elif button_id == "btn-home-el":
self._handle_home(1)
elif button_id == "btn-engage":
self._handle_engage_toggle()
def _handle_move(self) -> None:
"""Read AZ/EL inputs and issue a move command."""
if self._device is None:
return
az_input = self.query_one("#az-input", Input)
el_input = self.query_one("#el-input", Input)
try:
az = float(az_input.value) if az_input.value.strip() else self._last_az
except ValueError:
self.app.notify("Invalid AZ value", severity="warning")
return
try:
el = float(el_input.value) if el_input.value.strip() else self._last_el
except ValueError:
self.app.notify("Invalid EL value", severity="warning")
return
self._run_motor_command(self._device.move_to, az, el)
def _handle_home(self, motor_id: int) -> None:
"""Home a specific motor axis."""
if self._device is None:
return
axis = "AZ" if motor_id == 0 else "EL"
self.app.notify(f"Homing {axis}...", severity="information")
self._run_motor_command(self._device.home_motor, motor_id)
def _handle_engage_toggle(self) -> None:
"""Toggle motor engage/release state."""
if self._device is None:
return
if self._engaged:
self._run_motor_command(self._device.release)
self._engaged = False
self._update_engaged(False)
self.app.notify("Motors released")
else:
self._run_motor_command(self._device.engage)
self._engaged = True
self._update_engaged(True)
self.app.notify("Motors engaged")
# ------------------------------------------------------------------
# Key binding actions
# ------------------------------------------------------------------
def action_nudge_az(self, delta: int) -> None:
"""Nudge azimuth by delta degrees."""
if self._device is None:
return
new_az = self._last_az + delta
self._run_motor_command(self._device.move_motor, 0, new_az)
def action_nudge_el(self, delta: int) -> None:
"""Nudge elevation by delta degrees."""
if self._device is None:
return
new_el = self._last_el + delta
self._run_motor_command(self._device.move_motor, 1, new_el)
def action_home_both(self) -> None:
"""Home both AZ and EL motors."""
if self._device is None:
return
self.app.notify("Homing AZ + EL...", severity="information")
self._run_motor_command(self._device.home_motor, 0)
self._run_motor_command(self._device.home_motor, 1)
def action_engage_motors(self) -> None:
"""Engage (energize) stepper motors."""
if self._device is None:
return
self._run_motor_command(self._device.engage)
self._engaged = True
self._update_engaged(True)
self.app.notify("Motors engaged")
def action_release_motors(self) -> None:
"""Release (de-energize) stepper motors."""
if self._device is None:
return
self._run_motor_command(self._device.release)
self._engaged = False
self._update_engaged(False)
self.app.notify("Motors released")
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@work(thread=True, exclusive=False, group="motor-cmd")
def _run_motor_command(self, fn, *args) -> None:
"""Execute a motor command in a worker thread."""
try:
fn(*args)
except Exception:
log.exception("Motor command failed")
self.app.call_from_thread(
self.app.notify, "Motor command failed", severity="error"
)

View File

@ -0,0 +1,272 @@
"""F3 Scan Screen -- AZ sweep heatmap, sky mapping with configurable parameters.
Grid-based sky scan: iterates over AZ/EL range, moves the dish to each point,
reads RSSI, and paints the result into a 2D heatmap. Supports CSV export of
raw (az, el, rssi) data for offline analysis.
"""
import csv
import logging
import time
from pathlib import Path
from textual import work
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Button, Input, ProgressBar, Static
from textual.worker import get_current_worker
from birdcage_tui.widgets.sky_heatmap import SkyHeatmap
from birdcage_tui.widgets.sparkline_widget import SparklineWidget
log = logging.getLogger(__name__)
# Type alias -- SerialBridge and DemoDevice share the same duck-typed interface.
DeviceLike = object
class ScanScreen(Container):
"""F3: Sky scan and RF mapping."""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._device: DeviceLike | None = None
self._scanning = False
self._scan_data: list[tuple[float, float, float]] = []
def compose(self):
with Container(classes="screen-container"):
with Vertical(classes="panel"):
yield Static("Sky Scan", classes="panel-title")
yield SkyHeatmap(az_bins=40, el_bins=10, id="heatmap")
yield SparklineWidget(
max_points=80, label="Sweep RSSI", color="#00d4aa", id="sweep-spark"
)
with Horizontal(classes="scan-status"):
yield Static("Idle", id="scan-status-text")
yield ProgressBar(id="scan-progress", total=100, show_eta=False)
with Horizontal(classes="bottom-controls"):
yield Static("AZ ", classes="label")
yield Input(value="160", id="az-start", type="number")
yield Static("-", classes="label")
yield Input(value="220", id="az-end", type="number")
yield Static(" Step ", classes="label")
yield Input(value="1.5", id="az-step", type="number")
yield Static(" EL ", classes="label")
yield Input(value="18", id="el-start", type="number")
yield Static("-", classes="label")
yield Input(value="65", id="el-end", type="number")
yield Static(" Step ", classes="label")
yield Input(value="5.0", id="el-step", type="number")
with Horizontal(classes="bottom-controls"):
yield Static("Transponders ", classes="label")
yield Input(value="3", id="xponder-input", type="integer")
yield Button("Start Scan", id="btn-start-scan", variant="primary")
yield Button("Stop", id="btn-stop-scan")
yield Button("Export CSV", id="btn-export")
# ------------------------------------------------------------------
# Device wiring
# ------------------------------------------------------------------
def set_device(self, device: DeviceLike) -> None:
"""Store the device reference (SerialBridge or DemoDevice)."""
self._device = device
# ------------------------------------------------------------------
# Input helpers
# ------------------------------------------------------------------
def _read_float(self, widget_id: str, fallback: float) -> float:
"""Read a float from an Input widget, returning *fallback* on parse error."""
try:
return float(self.query_one(f"#{widget_id}", Input).value)
except (ValueError, TypeError):
return fallback
def _read_int(self, widget_id: str, fallback: int) -> int:
"""Read an int from an Input widget, returning *fallback* on parse error."""
try:
return int(self.query_one(f"#{widget_id}", Input).value)
except (ValueError, TypeError):
return fallback
# ------------------------------------------------------------------
# Scan worker
# ------------------------------------------------------------------
@work(thread=True)
def _do_scan(self) -> None:
"""Execute the AZ/EL grid scan in a background thread."""
worker = get_current_worker()
device = self._device
if device is None:
return
# Read scan parameters (widget access must happen via call_from_thread
# for Input.value, but Textual Input.value is a reactive that is safe
# to read from threads as a string snapshot).
az_start = self._read_float("az-start", 160.0)
az_end = self._read_float("az-end", 220.0)
az_step = self._read_float("az-step", 1.5)
el_start = self._read_float("el-start", 18.0)
el_end = self._read_float("el-end", 65.0)
el_step = self._read_float("el-step", 5.0)
iterations = self._read_int("xponder-input", 3)
# Clamp step sizes to something sane.
if az_step <= 0:
az_step = 1.0
if el_step <= 0:
el_step = 1.0
# Build the grid point list.
el_values: list[float] = []
el = el_start
while el <= el_end + 1e-9:
el_values.append(round(el, 2))
el += el_step
az_values: list[float] = []
az = az_start
while az <= az_end + 1e-9:
az_values.append(round(az, 2))
az += az_step
total_points = len(el_values) * len(az_values)
if total_points == 0:
self.app.call_from_thread(
self._set_status, "No grid points -- check parameters"
)
return
heatmap = self.query_one("#heatmap", SkyHeatmap)
spark = self.query_one("#sweep-spark", SparklineWidget)
done = 0
for _el_idx, el_val in enumerate(el_values):
for _az_idx, az_val in enumerate(az_values):
if not self._scanning or worker.is_cancelled:
self.app.call_from_thread(self._set_status, "Scan stopped")
return
# Move dish.
try:
device.move_to(az_val, el_val)
except Exception:
log.exception("move_to failed at AZ=%.2f EL=%.2f", az_val, el_val)
msg = f"Move error at AZ={az_val:.1f} EL={el_val:.1f}"
self.app.call_from_thread(self._set_status, msg)
continue
# Settle time -- let the motor stop and vibrations damp.
time.sleep(0.3)
# Read signal.
try:
rssi_data = device.get_rssi(iterations)
rssi = float(rssi_data.get("average", 0))
except Exception:
log.exception("get_rssi failed at AZ=%.2f EL=%.2f", az_val, el_val)
rssi = 0.0
# Record raw data.
self._scan_data.append((az_val, el_val, rssi))
# Map to heatmap grid indices -- fit into fixed-size bins.
az_span = az_end - az_start + 1e-9
el_span = el_end - el_start + 1e-9
grid_az = min(
int((az_val - az_start) / az_span * heatmap.az_bins),
heatmap.az_bins - 1,
)
grid_el = min(
int((el_val - el_start) / el_span * heatmap.el_bins),
heatmap.el_bins - 1,
)
# Update widgets.
self.app.call_from_thread(heatmap.set_point, grid_az, grid_el, rssi)
self.app.call_from_thread(heatmap.set_active, grid_az, grid_el)
self.app.call_from_thread(spark.push, rssi)
done += 1
pct = int(done * 100 / total_points)
status_text = (
f"Scanning AZ={az_val:.1f} EL={el_val:.1f} "
f"RSSI={rssi:.0f} [{done}/{total_points}]"
)
self.app.call_from_thread(self._set_progress, pct, status_text)
msg = f"Scan complete -- {total_points} points"
self.app.call_from_thread(self._set_status, msg)
# ------------------------------------------------------------------
# Widget update helpers (called via call_from_thread)
# ------------------------------------------------------------------
def _set_status(self, text: str) -> None:
"""Update the scan status text label."""
self.query_one("#scan-status-text", Static).update(text)
def _set_progress(self, pct: int, status_text: str) -> None:
"""Update both progress bar and status text."""
self.query_one("#scan-progress", ProgressBar).update(progress=pct)
self.query_one("#scan-status-text", Static).update(status_text)
# ------------------------------------------------------------------
# Button handlers
# ------------------------------------------------------------------
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id or ""
if button_id == "btn-start-scan":
self._start_scan()
elif button_id == "btn-stop-scan":
self._stop_scan()
elif button_id == "btn-export":
self._export_csv()
def _start_scan(self) -> None:
"""Clear state and kick off the scan worker."""
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
if self._scanning:
self.app.notify("Scan already in progress", severity="warning")
return
# Reset.
heatmap = self.query_one("#heatmap", SkyHeatmap)
heatmap.clear()
self._scan_data.clear()
self.query_one("#scan-progress", ProgressBar).update(progress=0)
self._set_status("Starting scan...")
self._scanning = True
self._do_scan()
def _stop_scan(self) -> None:
"""Signal the scan worker to stop."""
self._scanning = False
self._set_status("Stopping...")
def _export_csv(self) -> None:
"""Write scan data to /tmp/birdcage_scan.csv."""
if not self._scan_data:
self.app.notify("No scan data to export", severity="warning")
return
output = Path("/tmp/birdcage_scan.csv")
try:
with output.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(["az", "el", "rssi"])
for az, el, rssi in self._scan_data:
writer.writerow([f"{az:.2f}", f"{el:.2f}", f"{rssi:.1f}"])
self.app.notify(f"Exported {len(self._scan_data)} points to {output}")
except OSError as exc:
log.exception("CSV export failed")
self.app.notify(f"Export failed: {exc}", severity="error")

View File

@ -0,0 +1,233 @@
"""F2 Signal screen -- RSSI monitoring, sparklines, LNB control.
Widget container for ContentSwitcher. Provides start/stop signal
monitoring with configurable iteration count and poll rate, dual
sparklines for DVB and ADC RSSI, peak tracking, and LNA toggle.
"""
import logging
import re
import time
from textual import work
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Button, Input, Static
from textual.worker import Worker
from birdcage_tui.widgets.signal_gauge import SignalGauge
from birdcage_tui.widgets.sparkline_widget import SparklineWidget
log = logging.getLogger(__name__)
class SignalScreen(Container):
"""F2: Signal monitoring and RSSI display."""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._device: object = None
self._monitoring = False
self._lna_enabled = False
self._peak_rssi = 0
self._total_samples = 0
self._signal_worker: Worker | None = None
def compose(self) -> ComposeResult:
with Container(classes="screen-container"):
with Vertical(classes="panel"):
yield Static("Signal Strength", classes="panel-title")
yield SignalGauge(id="signal-gauge")
with Vertical():
yield SparklineWidget(
max_points=80, label="DVB RSSI", color="#00d4aa", id="dvb-spark"
)
yield SparklineWidget(
max_points=80, label="ADC RSSI", color="#2080d0", id="adc-spark"
)
with Horizontal(classes="panel"):
yield Static("Samples: 0", id="sample-count", classes="label")
yield Static(" Peak: 0", id="peak-value", classes="label")
yield Static(" LNA: OFF", id="lna-status", classes="label")
yield Static(" Lock: NO", id="lock-status", classes="label")
with Horizontal(classes="bottom-controls"):
yield Static("Iters ", classes="label")
yield Input(value="10", id="iter-input", type="integer")
yield Static(" Rate ", classes="label")
yield Input(value="2", id="rate-input", type="integer")
yield Button("Start", id="btn-start", variant="primary")
yield Button("Stop", id="btn-stop")
yield Button("Enable LNA", id="btn-lna")
# ------------------------------------------------------------------
# Device lifecycle
# ------------------------------------------------------------------
def set_device(self, device: object) -> None:
"""Store the device reference."""
self._device = device
def on_show(self) -> None:
"""Called when this screen becomes visible."""
pass # Monitoring is explicit via Start/Stop buttons.
# ------------------------------------------------------------------
# Signal poll worker
# ------------------------------------------------------------------
@work(thread=True, exclusive=True, group="signal-poll")
def _do_signal_poll(self) -> None:
"""Poll RSSI at the configured rate while monitoring is active."""
while self._monitoring:
if self._device is None:
time.sleep(0.5)
continue
# Read config from inputs (safe defaults on parse failure).
try:
iterations = int(
self.app.call_from_thread(self._read_input, "iter-input") or "10"
)
iterations = max(1, iterations)
except (ValueError, TypeError):
iterations = 10
try:
rate = int(
self.app.call_from_thread(self._read_input, "rate-input") or "2"
)
rate = max(1, rate)
except (ValueError, TypeError):
rate = 2
# DVB RSSI (bounded, averaged)
try:
rssi = self._device.get_rssi(iterations)
rssi_avg = rssi["average"]
rssi_cur = rssi["current"]
reads = rssi["reads"]
self._total_samples += reads
if rssi_cur > self._peak_rssi:
self._peak_rssi = rssi_cur
self.app.call_from_thread(self._update_gauge, rssi_avg, rssi_cur, reads)
self.app.call_from_thread(self._push_dvb_spark, float(rssi_avg))
self.app.call_from_thread(self._update_stats)
except Exception:
log.debug("DVB RSSI poll failed", exc_info=True)
# ADC RSSI (raw single-shot)
try:
adc_resp = self._device.get_adc_rssi()
adc_match = re.search(r"(\d+)", adc_resp)
if adc_match:
adc_val = float(adc_match.group(1))
self.app.call_from_thread(self._push_adc_spark, adc_val)
except Exception:
log.debug("ADC RSSI poll failed", exc_info=True)
# Lock status
try:
lock_resp = self._device.get_lock_status()
lock_match = re.search(r"Lock:(\d)", lock_resp)
if lock_match:
locked = lock_match.group(1) == "1"
self.app.call_from_thread(self._update_lock, locked)
except Exception:
log.debug("Lock status poll failed", exc_info=True)
time.sleep(1.0 / rate)
# ------------------------------------------------------------------
# Thread-safe widget update callbacks
# ------------------------------------------------------------------
def _read_input(self, input_id: str) -> str:
"""Read an Input widget's value (must run on main thread)."""
return self.query_one(f"#{input_id}", Input).value
def _update_gauge(self, rssi_avg: int, rssi_cur: int, reads: int) -> None:
gauge = self.query_one("#signal-gauge", SignalGauge)
gauge.rssi_avg = rssi_avg
gauge.rssi_cur = rssi_cur
gauge.reads = reads
def _push_dvb_spark(self, value: float) -> None:
self.query_one("#dvb-spark", SparklineWidget).push(value)
def _push_adc_spark(self, value: float) -> None:
self.query_one("#adc-spark", SparklineWidget).push(value)
def _update_stats(self) -> None:
self.query_one("#sample-count", Static).update(
f"Samples: {self._total_samples}"
)
self.query_one("#peak-value", Static).update(f" Peak: {self._peak_rssi}")
def _update_lock(self, locked: bool) -> None:
label = "YES" if locked else "NO"
self.query_one("#lock-status", Static).update(f" Lock: {label}")
def _update_lna_label(self) -> None:
label = "ON" if self._lna_enabled else "OFF"
self.query_one("#lna-status", Static).update(f" LNA: {label}")
# ------------------------------------------------------------------
# Button handlers
# ------------------------------------------------------------------
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id or ""
if button_id == "btn-start":
self._handle_start()
elif button_id == "btn-stop":
self._handle_stop()
elif button_id == "btn-lna":
self._handle_lna()
def _handle_start(self) -> None:
"""Start signal monitoring."""
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
if self._monitoring:
return
self._monitoring = True
self._signal_worker = self._do_signal_poll()
self.app.notify("Signal monitoring started")
self.query_one("#btn-start", Button).variant = "default"
self.query_one("#btn-stop", Button).variant = "warning"
def _handle_stop(self) -> None:
"""Stop signal monitoring."""
self._monitoring = False
self.app.notify("Signal monitoring stopped")
self.query_one("#btn-start", Button).variant = "primary"
self.query_one("#btn-stop", Button).variant = "default"
def _handle_lna(self) -> None:
"""Toggle LNA enable (sends lnbdc odu to set 13V)."""
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
self._do_enable_lna()
@work(thread=True, exclusive=False, group="signal-cmd")
def _do_enable_lna(self) -> None:
"""Enable LNA in a worker thread (blocks on serial I/O)."""
try:
self._device.enable_lna()
self._lna_enabled = True
self.app.call_from_thread(self._update_lna_label)
self.app.call_from_thread(self.app.notify, "LNA enabled (13V ODU)")
except Exception:
log.exception("LNA enable failed")
self.app.call_from_thread(
self.app.notify, "LNA enable failed", severity="error"
)

View File

@ -0,0 +1,358 @@
"""F4 System Screen -- NVS table, A3981 diagnostics, motor dynamics, firmware info.
Aggregates hardware identity, stepper driver status, motor tuning parameters,
and the full non-volatile storage table into a single dashboard panel. All data
is fetched from the device in a background worker thread and pushed to widgets
via call_from_thread.
"""
import json
import logging
import re
from pathlib import Path
from rich.text import Text
from textual import work
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Button, Static
from textual.worker import get_current_worker
from birdcage_tui.widgets.nvs_table import NvsTable
log = logging.getLogger(__name__)
# Type alias -- SerialBridge and DemoDevice share the same duck-typed interface.
DeviceLike = object
def _parse_firmware_info(raw: str) -> Text:
"""Extract version, clock speed, and antenna ID from ``os > id`` output.
Returns a styled Rich Text suitable for a Static widget.
"""
version = "?"
clock = "?"
ant_id = "?"
m = re.search(r"Software version:\s*(\S+)", raw)
if m:
version = m.group(1)
m = re.search(r"CCLK:\s*(\d+)", raw)
if m:
mhz = int(m.group(1)) // 1_000_000
clock = f"{mhz}MHz"
m = re.search(r"Ant ID:\s*(.+?)$", raw, re.MULTILINE)
if m:
ant_id = m.group(1).strip()
result = Text()
result.append("FW: ", style="#506878")
result.append(version, style="bold #00d4aa")
result.append(" | ", style="#1a2a38")
result.append("MCU: ", style="#506878")
result.append(f"K60 {clock}", style="#c8d0d8")
result.append(" | ", style="#1a2a38")
result.append("Ant: ", style="#506878")
result.append(ant_id, style="#c8d0d8")
return result
def _format_a3981(diag: str, modes: dict[str, str], torque: str) -> Text:
"""Combine A3981 diagnostic, mode, and torque data into styled text."""
result = Text()
# Diagnostics -- highlight OK in green, FAULT in red.
for line in diag.splitlines():
line = line.strip()
if not line:
continue
if "FAULT" in line.upper():
result.append(line, style="bold #e04040")
elif "OK" in line.upper():
result.append(line, style="#00e060")
else:
result.append(line, style="#c8d0d8")
result.append("\n")
# Step mode.
sm = modes.get("step_mode", "")
for line in sm.splitlines():
line = line.strip()
if line:
result.append(line, style="#506878")
result.append("\n")
# Current mode.
cm = modes.get("current_mode", "")
for line in cm.splitlines():
line = line.strip()
if line:
result.append(line, style="#506878")
result.append("\n")
# Torque -- HIGH in warm color, LOW in dim.
for line in torque.splitlines():
line = line.strip()
if not line:
continue
if "HIGH" in line.upper():
result.append(line, style="#e8c020")
else:
result.append(line, style="#384858")
result.append("\n")
# Trim trailing newline.
text_str = result.plain
if text_str.endswith("\n"):
result.right_crop(1)
return result
def _format_motor_dynamics(
dynamics: dict[str, float],
el_limits: dict[str, float],
) -> Text:
"""Format motor velocity, acceleration, and EL limits into styled text."""
result = Text()
az_vel = dynamics.get("az_max_vel", 0.0)
el_vel = dynamics.get("el_max_vel", 0.0)
az_acc = dynamics.get("az_accel", 0.0)
el_acc = dynamics.get("el_accel", 0.0)
result.append("AZ Max Vel: ", style="#506878")
result.append(f"{az_vel:.1f}", style="bold #c8d0d8")
result.append("\u00b0/s", style="#506878")
result.append(" ", style="#0e1420")
result.append("EL Max Vel: ", style="#506878")
result.append(f"{el_vel:.1f}", style="bold #c8d0d8")
result.append("\u00b0/s", style="#506878")
result.append("\n")
result.append("AZ Accel: ", style="#506878")
result.append(f"{az_acc:.1f}", style="bold #c8d0d8")
result.append("\u00b0/s\u00b2", style="#506878")
result.append(" ", style="#0e1420")
result.append("EL Accel: ", style="#506878")
result.append(f"{el_acc:.1f}", style="bold #c8d0d8")
result.append("\u00b0/s\u00b2", style="#506878")
result.append("\n")
result.append("Steps/Rev: ", style="#506878")
result.append("40000 / 24960", style="#c8d0d8")
el_min = el_limits.get("min", 0.0)
el_max = el_limits.get("max", 0.0)
el_home = el_limits.get("home", 0.0)
result.append("\n")
result.append("EL Range: ", style="#506878")
result.append(f"{el_min:.1f}", style="#c8d0d8")
result.append("\u00b0 - ", style="#506878")
result.append(f"{el_max:.1f}", style="#c8d0d8")
result.append("\u00b0", style="#506878")
result.append(" Home: ", style="#506878")
result.append(f"{el_home:.1f}", style="bold #00d4aa")
result.append("\u00b0", style="#506878")
return result
class SystemScreen(Container):
"""F4: System information, NVS, and diagnostics."""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._device: DeviceLike | None = None
self._refreshed = False
def compose(self):
with Container(classes="screen-container"):
with Horizontal(classes="panel"):
yield Static("", id="firmware-info")
with Horizontal(classes="top-row"):
with Vertical(classes="panel"):
yield Static("A3981 Diagnostics", classes="panel-title")
yield Static("", id="a3981-diag")
with Vertical(classes="panel"):
yield Static("Motor Dynamics", classes="panel-title")
yield Static("", id="motor-dynamics")
with Vertical(classes="panel"):
yield Static("NVS Table", classes="panel-title")
yield NvsTable(id="nvs-table")
with Horizontal(classes="bottom-controls"):
yield Button("Refresh All", id="btn-refresh-all", variant="primary")
yield Button("Refresh NVS", id="btn-refresh-nvs")
yield Button("Export NVS JSON", id="btn-export-nvs")
# ------------------------------------------------------------------
# Device wiring
# ------------------------------------------------------------------
def set_device(self, device: DeviceLike) -> None:
"""Store the device reference and trigger an initial refresh."""
self._device = device
# Only auto-refresh if we're already mounted (widget tree exists).
try:
self.query_one("#firmware-info")
self._do_system_refresh()
except Exception:
# Not mounted yet -- on_show will handle it.
pass
def on_show(self) -> None:
"""Called when this screen becomes visible via ContentSwitcher."""
if self._device is not None and not self._refreshed:
self._do_system_refresh()
# ------------------------------------------------------------------
# System refresh worker
# ------------------------------------------------------------------
@work(thread=True)
def _do_system_refresh(self) -> None:
"""Fetch all system data from the device in a background thread."""
worker = get_current_worker()
device = self._device
if device is None:
return
# 1. Firmware identification.
try:
fw_raw = device.get_firmware_id()
fw_text = _parse_firmware_info(fw_raw)
self.app.call_from_thread(
self.query_one("#firmware-info", Static).update, fw_text
)
except Exception:
log.exception("Failed to read firmware ID")
self.app.call_from_thread(
self.query_one("#firmware-info", Static).update,
Text("FW: error reading firmware ID", style="#e04040"),
)
if worker.is_cancelled:
return
# 2. A3981 diagnostics.
try:
diag = device.get_a3981_diag()
modes = device.get_a3981_modes()
torque = device.get_a3981_torque()
a3981_text = _format_a3981(diag, modes, torque)
self.app.call_from_thread(
self.query_one("#a3981-diag", Static).update, a3981_text
)
except Exception:
log.exception("Failed to read A3981 data")
self.app.call_from_thread(
self.query_one("#a3981-diag", Static).update,
Text("Error reading A3981 diagnostics", style="#e04040"),
)
if worker.is_cancelled:
return
# 3. Motor dynamics.
try:
dynamics = device.get_motor_dynamics()
el_limits = device.get_el_limits()
motor_text = _format_motor_dynamics(dynamics, el_limits)
self.app.call_from_thread(
self.query_one("#motor-dynamics", Static).update, motor_text
)
except Exception:
log.exception("Failed to read motor dynamics")
self.app.call_from_thread(
self.query_one("#motor-dynamics", Static).update,
Text("Error reading motor dynamics", style="#e04040"),
)
if worker.is_cancelled:
return
# 4. NVS dump.
try:
nvs_text = device.nvs_dump()
nvs_table = self.query_one("#nvs-table", NvsTable)
self.app.call_from_thread(nvs_table.load_nvs, nvs_text)
except Exception:
log.exception("Failed to dump NVS")
self.app.call_from_thread(
self.app.notify, "NVS dump failed", severity="error"
)
self._refreshed = True
# ------------------------------------------------------------------
# NVS-only refresh worker
# ------------------------------------------------------------------
@work(thread=True)
def _do_nvs_refresh(self) -> None:
"""Refresh just the NVS table without touching other panels."""
device = self._device
if device is None:
return
try:
nvs_text = device.nvs_dump()
nvs_table = self.query_one("#nvs-table", NvsTable)
self.app.call_from_thread(nvs_table.load_nvs, nvs_text)
self.app.call_from_thread(self.app.notify, "NVS table refreshed")
except Exception:
log.exception("Failed to refresh NVS")
self.app.call_from_thread(
self.app.notify, "NVS refresh failed", severity="error"
)
# ------------------------------------------------------------------
# Button handlers
# ------------------------------------------------------------------
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id or ""
if button_id == "btn-refresh-all":
self._handle_refresh_all()
elif button_id == "btn-refresh-nvs":
self._handle_refresh_nvs()
elif button_id == "btn-export-nvs":
self._export_nvs_json()
def _handle_refresh_all(self) -> None:
"""Kick off a full system refresh."""
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
self._do_system_refresh()
def _handle_refresh_nvs(self) -> None:
"""Kick off an NVS-only refresh."""
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
self._do_nvs_refresh()
def _export_nvs_json(self) -> None:
"""Export parsed NVS rows to /tmp/birdcage_nvs.json."""
nvs_table = self.query_one("#nvs-table", NvsTable)
rows = nvs_table.parsed_rows
if not rows:
self.app.notify(
"No NVS data to export -- refresh first", severity="warning"
)
return
output = Path("/tmp/birdcage_nvs.json")
try:
with output.open("w") as fh:
json.dump(rows, fh, indent=2)
self.app.notify(f"Exported {len(rows)} NVS entries to {output}")
except OSError as exc:
log.exception("NVS JSON export failed")
self.app.notify(f"Export failed: {exc}", severity="error")

View File

@ -0,0 +1,496 @@
/* Birdcage TUI — Dark RF Theme
* Teal accent on deep blue-black. No purple.
* Signal gradient: blue > cyan > green > yellow > red
*/
/* ── Global ────────────────────────────────────────── */
Screen {
background: #0a0a12;
color: #c8d0d8;
}
Header {
background: #0e1420;
color: #00d4aa;
text-style: bold;
dock: top;
height: 1;
}
Footer {
background: #0e1420;
color: #506878;
dock: bottom;
height: 1;
}
/* ── Layout Containers ─────────────────────────────── */
#main-area {
layout: horizontal;
height: 1fr;
}
#sidebar {
width: 26;
background: #0e1420;
border-right: solid #1a2a3a;
padding: 1 1;
}
.sidebar-title {
color: #00d4aa;
text-style: bold;
text-align: center;
width: 100%;
}
.sidebar-subtitle {
color: #506878;
text-align: center;
width: 100%;
margin: 0 0 1 0;
}
#content-area {
width: 1fr;
}
ContentSwitcher {
width: 1fr;
height: 1fr;
}
/* ── Sidebar Buttons ───────────────────────────────── */
.sidebar-btn {
width: 100%;
height: 3;
margin: 0 0 1 0;
background: #121c2a;
color: #7090a8;
text-style: bold;
border: round #1a3050;
text-align: center;
}
.sidebar-btn:hover {
background: #1a2a40;
color: #00d4aa;
border: round #00d4aa;
}
.sidebar-btn.active {
background: #0a2a3a;
color: #00d4aa;
border: round #00d4aa;
text-style: bold;
}
/* ── Panel / Card ──────────────────────────────────── */
.panel {
background: #0e1420;
border: round #1a2a3a;
padding: 1 2;
margin: 0 1 1 1;
}
.panel-title {
color: #00d4aa;
text-style: bold;
margin-bottom: 1;
}
/* ── Data Display ──────────────────────────────────── */
.value-large {
color: #00d4aa;
text-style: bold;
}
.value-normal {
color: #c8d0d8;
}
.label {
color: #506878;
}
.label-dim {
color: #384858;
}
/* ── Status Indicators ─────────────────────────────── */
.status-ok {
color: #00e060;
}
.status-warn {
color: #e8a020;
}
.status-error {
color: #e04040;
}
.status-demo {
color: #e8a020;
text-style: italic;
}
/* ── Input Controls ────────────────────────────────── */
Input {
background: #121c2a;
border: round #1a3050;
color: #c8d0d8;
padding: 0 1;
}
Input:focus {
border: round #00d4aa;
}
Button {
background: #1a2a40;
color: #00d4aa;
border: round #1a3050;
min-width: 10;
height: 3;
}
Button:hover {
background: #00d4aa;
color: #0a0a12;
border: round #00d4aa;
}
Button:focus {
border: round #00d4aa;
text-style: bold;
}
Button.-active {
background: #0a3a3a;
}
/* ── DataTable ─────────────────────────────────────── */
DataTable {
background: #0a0a12;
color: #c8d0d8;
height: 1fr;
}
DataTable > .datatable--header {
background: #0e1420;
color: #00d4aa;
text-style: bold;
}
DataTable > .datatable--cursor {
background: #142030;
color: #c8d0d8;
}
DataTable > .datatable--even-row {
background: #0a0a12;
}
DataTable > .datatable--odd-row {
background: #0c0e18;
}
/* ── RichLog ───────────────────────────────────────── */
RichLog {
background: #0a0a12;
color: #c8d0d8;
border: round #1a2a3a;
scrollbar-color: #1a2a38;
scrollbar-color-active: #00d4aa;
scrollbar-color-hover: #2a4a58;
height: 1fr;
}
/* ── Progress Bar ──────────────────────────────────── */
ProgressBar {
padding: 0 1;
}
ProgressBar Bar {
color: #00d4aa;
background: #1a2a38;
}
ProgressBar PercentageStatus {
color: #506878;
text-style: bold;
}
/* ── Sparkline ─────────────────────────────────────── */
.sparkline {
height: 2;
padding: 0 1;
color: #00d4aa;
background: #0e1420;
}
.sparkline-label {
color: #506878;
width: 14;
}
/* ── Compass Rose ──────────────────────────────────── */
#compass-container {
height: auto;
min-height: 14;
padding: 1;
}
.compass-readout {
color: #00d4aa;
text-style: bold;
}
/* ── Signal Gauge ──────────────────────────────────── */
.gauge-container {
height: 3;
padding: 0 1;
}
.gauge-bar {
height: 1;
}
.gauge-label {
color: #506878;
width: 8;
}
.gauge-value {
color: #c8d0d8;
width: 8;
text-align: right;
}
/* ── Signal Colors (gradient: blue > cyan > green > yellow > red) ── */
.signal-cold {
color: #2080d0;
}
.signal-cool {
color: #00b8c8;
}
.signal-mid {
color: #00e060;
}
.signal-warm {
color: #e8c020;
}
.signal-hot {
color: #e04040;
}
/* ── Sky Heatmap ───────────────────────────────────── */
.heatmap-container {
height: 1fr;
padding: 0;
}
.heatmap-cell {
width: 2;
height: 1;
}
/* ── Motor Status ──────────────────────────────────── */
.motor-panel {
height: auto;
padding: 1 2;
background: #0e1420;
border: round #1a2a3a;
}
.motor-row {
layout: horizontal;
height: 1;
}
.motor-label {
color: #506878;
width: 14;
}
.motor-value {
color: #c8d0d8;
}
/* ── Device Status Bar (sidebar bottom) ────────────── */
#device-status {
dock: bottom;
height: auto;
padding: 1;
background: #0e1420;
border-top: solid #1a2a38;
}
.device-status-label {
color: #506878;
}
.device-status-value {
color: #c8d0d8;
}
.device-connected {
color: #00e060;
}
.device-demo {
color: #e8a020;
}
/* ── Console Screen ────────────────────────────────── */
.console-input-area {
dock: bottom;
height: 3;
layout: horizontal;
padding: 0 1;
background: #0e1420;
border-top: solid #1a2a38;
}
.console-input-area Input {
width: 1fr;
}
.console-input-area Button {
width: 10;
margin-left: 1;
}
.console-context {
dock: bottom;
height: 1;
padding: 0 1;
background: #0e1420;
color: #506878;
}
/* ── Prompt Colors (by submenu) ────────────────────── */
.prompt-trk {
color: #00d4aa;
}
.prompt-mot {
color: #00e060;
}
.prompt-dvb {
color: #2080d0;
}
.prompt-nvs {
color: #e8a020;
}
.prompt-a3981 {
color: #00b8c8;
}
.prompt-step {
color: #40c0a0;
}
.prompt-os {
color: #8090a0;
}
.prompt-other {
color: #506878;
}
/* ── Scan Screen ───────────────────────────────────── */
.scan-controls {
dock: bottom;
height: auto;
padding: 1;
background: #0e1420;
border-top: solid #1a2a38;
}
.scan-status {
height: 2;
padding: 0 1;
color: #506878;
}
/* ── Screen-Level Layouts ──────────────────────────── */
.screen-container {
layout: vertical;
height: 1fr;
width: 1fr;
}
.top-row {
layout: horizontal;
height: 1fr;
}
.bottom-controls {
dock: bottom;
height: auto;
padding: 1;
background: #0e1420;
border-top: solid #1a2a38;
layout: horizontal;
}
.control-group {
layout: horizontal;
height: 3;
width: 1fr;
}
.control-group Input {
width: 8;
margin-right: 1;
}
.control-group Button {
margin-right: 1;
}
/* ── NVS Table Highlight ───────────────────────────── */
.nvs-modified {
color: #e8a020;
text-style: bold;
}
/* ── Scrollbar Styling ─────────────────────────────── */
* {
scrollbar-color: #1a2a38;
scrollbar-color-active: #00d4aa;
scrollbar-color-hover: #2a4a58;
scrollbar-background: #0a0a12;
}

View File

@ -0,0 +1,21 @@
"""Custom widgets for the Birdcage TUI."""
from birdcage_tui.widgets.compass_rose import CompassRose
from birdcage_tui.widgets.device_status_bar import DeviceStatusBar
from birdcage_tui.widgets.motor_status import MotorStatus
from birdcage_tui.widgets.nvs_table import NvsTable
from birdcage_tui.widgets.serial_log import SerialLog
from birdcage_tui.widgets.signal_gauge import SignalGauge
from birdcage_tui.widgets.sky_heatmap import SkyHeatmap
from birdcage_tui.widgets.sparkline_widget import SparklineWidget
__all__ = [
"CompassRose",
"DeviceStatusBar",
"MotorStatus",
"NvsTable",
"SerialLog",
"SignalGauge",
"SkyHeatmap",
"SparklineWidget",
]

View File

@ -0,0 +1,183 @@
"""Compass rose widget — visual AZ/EL position display with Unicode compass dial."""
from rich.text import Text
from textual.reactive import reactive
from textual.widgets import Static
# Compass grid layout: 11 columns x 7 rows.
# Positions indexed [row][col] where (0,0) is top-left.
# Cardinal/intercardinal markers are placed at fixed positions.
# The pointer occupies one of 16 perimeter slots based on azimuth.
# 16-slot perimeter positions (clockwise from N=0):
# Each entry is (row, col) on the 11x7 grid.
_POINTER_SLOTS: list[tuple[int, int]] = [
(0, 5), # 0: N
(0, 7), # 1: NNE
(1, 9), # 2: NE
(2, 10), # 3: ENE
(3, 10), # 4: E
(4, 10), # 5: ESE
(5, 9), # 6: SE
(6, 7), # 7: SSE
(6, 5), # 8: S
(6, 3), # 9: SSW
(5, 1), # 10: SW
(4, 0), # 11: WSW
(3, 0), # 12: W
(2, 0), # 13: WNW
(1, 1), # 14: NW
(0, 3), # 15: NNW
]
# Fixed cardinal/intercardinal label positions: (row, col, label)
_LABELS: list[tuple[int, int, str]] = [
(0, 5, "N"),
(3, 10, "E"),
(6, 5, "S"),
(3, 0, "W"),
]
# Ring structure characters for the compass dial.
_RING_CHARS: dict[tuple[int, int], str] = {
# Top arc
(0, 3): ".",
(0, 4): "\u2500",
(0, 6): "\u2500",
(0, 7): ".",
# Upper sides
(1, 1): "/",
(1, 9): "\\",
# Mid-upper sides
(2, 0): "\u2502",
(2, 10): "\u2502",
# Center sides (cardinals placed separately)
# (3, 0) and (3, 10) reserved for W/E labels
# Lower-mid sides
(4, 0): "\u2502",
(4, 10): "\u2502",
# Lower sides
(5, 1): "\\",
(5, 9): "/",
# Bottom arc
(6, 3): "'",
(6, 4): "\u2500",
(6, 6): "\u2500",
(6, 7): "'",
}
def _azimuth_to_slot(az: float) -> int:
"""Map azimuth (0-360, 0=N clockwise) to one of 16 perimeter slots."""
normalized = az % 360.0
slot = round(normalized / 22.5) % 16
return slot
class CompassRose(Static):
"""Visual compass display showing azimuth/elevation position."""
azimuth: reactive[float] = reactive(180.0)
elevation: reactive[float] = reactive(45.0)
def render(self) -> Text:
result = Text()
# Large numeric readout
az_label = Text("AZ ", style="#506878 bold")
az_value = Text(f"{self.azimuth:7.2f}\u00b0", style="#00d4aa bold")
el_label = Text(" EL ", style="#506878 bold")
el_value = Text(f"{self.elevation:6.2f}\u00b0", style="#00d4aa bold")
result.append(az_label)
result.append(az_value)
result.append(el_label)
result.append(el_value)
result.append("\n\n")
# Build the 7x11 compass grid
grid: list[list[tuple[str, str]]] = [
[(" ", "#0e1420") for _ in range(11)] for _ in range(7)
]
# Place ring structure
for (r, c), ch in _RING_CHARS.items():
grid[r][c] = (ch, "#c8d0d8")
# Place cardinal labels
for r, c, label in _LABELS:
grid[r][c] = (label, "#506878 bold")
# Center crosshair
grid[3][5] = ("\u253c", "#1a2a38")
grid[3][4] = ("\u2500", "#1a2a38")
grid[3][6] = ("\u2500", "#1a2a38")
grid[2][5] = ("\u2502", "#1a2a38")
grid[4][5] = ("\u2502", "#1a2a38")
# Place pointer at azimuth position
slot = _azimuth_to_slot(self.azimuth)
pr, pc = _POINTER_SLOTS[slot]
# Use a filled diamond for the pointer
grid[pr][pc] = ("\u25c6", "#00d4aa bold")
# Compute a line from center toward the pointer direction for visual clarity
# Place a dot at an intermediate position between center (3,5) and pointer
cr, cc = 3, 5
dr = pr - cr
dc = pc - cc
if abs(dr) > 1 or abs(dc) > 1:
mr = cr + (1 if dr > 0 else (-1 if dr < 0 else 0))
mc = cc + (1 if dc > 0 else (-1 if dc < 0 else 0))
# Only place intermediate dot if it doesn't overwrite a label
existing_ch = grid[mr][mc][0]
if existing_ch in (" ", "\u2500", "\u2502", "\u253c"):
grid[mr][mc] = ("\u2022", "#00d4aa")
# Render grid to text
for row_idx, row in enumerate(grid):
for _col_idx, (ch, style) in enumerate(row):
result.append(ch, style=style)
if row_idx < 6:
result.append("\n")
# Bearing line below compass
bearing = self.azimuth % 360.0
if bearing < 0:
bearing += 360.0
cardinal = _bearing_to_cardinal(bearing)
result.append("\n")
result.append(f" {cardinal:>5s}", style="#506878")
result.append(f" {bearing:05.1f}\u00b0", style="#c8d0d8")
return result
def watch_azimuth(self, _value: float) -> None:
self.refresh()
def watch_elevation(self, _value: float) -> None:
self.refresh()
def _bearing_to_cardinal(bearing: float) -> str:
"""Convert bearing in degrees to 16-point cardinal abbreviation."""
directions = [
"N",
"NNE",
"NE",
"ENE",
"E",
"ESE",
"SE",
"SSE",
"S",
"SSW",
"SW",
"WSW",
"W",
"WNW",
"NW",
"NNW",
]
idx = round(bearing / 22.5) % 16
return directions[idx]

View File

@ -0,0 +1,92 @@
"""Device status bar widget — sidebar display of connection state and firmware info."""
from rich.text import Text
from textual.widgets import Static
class DeviceStatusBar(Static):
"""Sidebar status display showing connection state and firmware info."""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._connected: bool = False
self._demo: bool = False
self._firmware: str = "---"
self._submenu: str = "---"
self._port: str = "---"
def set_device(self, device: object) -> None:
"""Accept a device reference and update the status display."""
is_demo = hasattr(device, "demo_mode") or type(device).__name__ == "DemoDevice"
fw = getattr(device, "firmware_id", "02.02.48") if device else "---"
port = getattr(self.app, "serial_port", "/dev/ttyUSB0") if self.app else "---"
submenu = getattr(device, "current_menu", "TRK>") if device else "---"
connected = device is not None and not is_demo
self.update_status(
connected=connected,
demo=is_demo,
firmware=str(fw),
submenu=str(submenu),
port=str(port),
)
def update_status(
self,
connected: bool,
demo: bool,
firmware: str,
submenu: str,
port: str,
) -> None:
"""Update all status fields and refresh the display."""
self._connected = connected
self._demo = demo
self._firmware = firmware
self._submenu = submenu
self._port = port
self.refresh()
def render(self) -> Text:
result = Text()
label_w = 8
# Status row
result.append("Status".ljust(label_w), style="#506878")
if self._connected:
result.append("Connected", style="#00e060 bold")
elif self._demo:
result.append("Demo", style="#e8a020 italic")
else:
result.append("Offline", style="#e04040")
result.append("\n")
# Port row
result.append("Port".ljust(label_w), style="#506878")
result.append(self._port, style="#c8d0d8")
result.append("\n")
# Firmware row
result.append("FW".ljust(label_w), style="#506878")
result.append(self._firmware, style="#c8d0d8")
result.append("\n")
# Menu row
result.append("Menu".ljust(label_w), style="#506878")
# Color the menu prompt with its matching prompt color
submenu_colors: dict[str, str] = {
"TRK>": "#00d4aa",
"MOT>": "#00e060",
"DVB>": "#2080d0",
"NVS>": "#e8a020",
"A3981>": "#00b8c8",
"STEP>": "#40c0a0",
"EE>": "#e8a020",
"OS>": "#8090a0",
"ADC>": "#00b8c8",
"GPIO>": "#40c0a0",
"PEAK>": "#e8c020",
}
menu_color = submenu_colors.get(self._submenu, "#c8d0d8")
result.append(self._submenu, style=f"{menu_color} bold")
return result

View File

@ -0,0 +1,76 @@
"""Motor status widget — engagement state, torque, step counts, and EL range."""
from rich.text import Text
from textual.reactive import reactive
from textual.widgets import Static
class MotorStatus(Static):
"""Panel showing motor engagement state, torque, and step counts."""
engaged: reactive[bool] = reactive(False)
az_torque: reactive[str] = reactive("LOW")
el_torque: reactive[str] = reactive("LOW")
az_steps: reactive[int] = reactive(0)
el_steps: reactive[int] = reactive(0)
el_min: reactive[float] = reactive(18.0)
el_max: reactive[float] = reactive(65.0)
def render(self) -> Text:
result = Text()
label_w = 10
# Engaged row
result.append("Engaged".ljust(label_w), style="#506878")
if self.engaged:
result.append("YES", style="#00e060 bold")
else:
result.append("NO", style="#e04040")
result.append("\n")
# Torque row
result.append("Torque".ljust(label_w), style="#506878")
result.append("AZ: ", style="#506878")
az_style = "#e8c020 bold" if self.az_torque == "HIGH" else "#c8d0d8"
result.append(f"{self.az_torque}", style=az_style)
result.append(" EL: ", style="#506878")
el_style = "#e8c020 bold" if self.el_torque == "HIGH" else "#c8d0d8"
result.append(f"{self.el_torque}", style=el_style)
result.append("\n")
# Steps row
result.append("Steps".ljust(label_w), style="#506878")
result.append("AZ: ", style="#506878")
result.append(f"{self.az_steps}", style="#c8d0d8")
result.append(" EL: ", style="#506878")
result.append(f"{self.el_steps}", style="#c8d0d8")
result.append("\n")
# EL Range row
result.append("EL Range".ljust(label_w), style="#506878")
result.append(f"{self.el_min:.1f}\u00b0", style="#c8d0d8")
result.append(" \u2013 ", style="#506878")
result.append(f"{self.el_max:.1f}\u00b0", style="#c8d0d8")
return result
def watch_engaged(self, _value: bool) -> None:
self.refresh()
def watch_az_torque(self, _value: str) -> None:
self.refresh()
def watch_el_torque(self, _value: str) -> None:
self.refresh()
def watch_az_steps(self, _value: int) -> None:
self.refresh()
def watch_el_steps(self, _value: int) -> None:
self.refresh()
def watch_el_min(self, _value: float) -> None:
self.refresh()
def watch_el_max(self, _value: float) -> None:
self.refresh()

View File

@ -0,0 +1,93 @@
"""NVS table widget — DataTable wrapper for non-volatile storage dump display."""
import re
from textual.widgets import DataTable
# Regex to parse NVS dump lines.
# Examples:
# 0) Log ID's 0x00000007 0x00000007 0x00000007
# 20) Disable Tracker Proc? TRUE TRUE FALSE
# 101) Minimum Elevation Angle 18.00 18.00 18.00
_NVS_LINE_RE = re.compile(
r"^\s*(\d+)\)\s+" # index with closing paren
r"(.+?)\s{2,}" # name (greedy until 2+ spaces)
r"(\S+)\s+" # current value
r"(\S+)\s+" # saved value
r"(\S+)\s*$" # default value
)
class NvsTable(DataTable):
"""DataTable displaying NVS (non-volatile storage) dump data."""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._parsed_rows: list[dict[str, str]] = []
self._columns_added = False
def on_mount(self) -> None:
"""Add columns when the widget is mounted."""
if not self._columns_added:
self.add_columns("Idx", "Name", "Current", "Saved", "Default")
self._columns_added = True
def load_nvs(self, text: str) -> list[dict[str, str]]:
"""Parse NVS dump text and populate the table.
Returns a list of dicts with keys: idx, name, current, saved, default.
Rows where current != default are marked for the screen to highlight.
"""
self.clear_table()
self._parsed_rows = []
for line in text.splitlines():
line = line.rstrip()
if not line:
continue
match = _NVS_LINE_RE.match(line)
if not match:
continue
idx = match.group(1)
name = match.group(2).strip()
current = match.group(3)
saved = match.group(4)
default = match.group(5)
row_data = {
"idx": idx,
"name": name,
"current": current,
"saved": saved,
"default": default,
}
self._parsed_rows.append(row_data)
# Add row to the DataTable
modified = current != default
# Prefix the index cell to signal modification to the screen.
# The screen's CSS rule .nvs-modified handles styling.
label = f"*{idx}" if modified else idx
self.add_row(label, name, current, saved, default, key=f"nvs-{idx}")
return self._parsed_rows
def clear_table(self) -> None:
"""Remove all rows from the table."""
self.clear()
self._parsed_rows = []
@property
def parsed_rows(self) -> list[dict[str, str]]:
"""Access the most recently parsed NVS data."""
return list(self._parsed_rows)
@property
def modified_indices(self) -> list[str]:
"""Return indices where current value differs from default."""
return [
row["idx"] for row in self._parsed_rows if row["current"] != row["default"]
]

View File

@ -0,0 +1,84 @@
"""Serial log widget — RichLog with color-coded firmware console prompts."""
import re
from rich.text import Text
from textual.widgets import RichLog
# Prompt patterns and their colors, ordered by specificity (longest match first).
_PROMPT_STYLES: list[tuple[str, str]] = [
("A3981>", "#00b8c8"),
("STEP>", "#40c0a0"),
("TRK>", "#00d4aa"),
("MOT>", "#00e060"),
("DVB>", "#2080d0"),
("NVS>", "#e8a020"),
("EE>", "#e8a020"),
("OS>", "#8090a0"),
("ADC>", "#00b8c8"),
("GPIO>", "#40c0a0"),
("PEAK>", "#e8c020"),
("LATLON>", "#506878"),
("DIPSWITCH>", "#506878"),
]
# Build a regex that matches any known prompt at any position in the text.
_PROMPT_PATTERN = re.compile(
r"(" + "|".join(re.escape(p) for p, _ in _PROMPT_STYLES) + r")"
)
# Lookup dict for color by prompt string
_PROMPT_COLOR: dict[str, str] = {p: c for p, c in _PROMPT_STYLES}
class SerialLog(RichLog):
"""RichLog that color-codes Winegard firmware console prompts."""
def __init__(self, **kwargs) -> None:
super().__init__(markup=False, wrap=True, **kwargs)
def append_output(self, text: str) -> None:
"""Append firmware output with color-coded prompts.
Each line is scanned for known prompt strings (TRK>, MOT>, etc.)
which are rendered in their assigned color. All other text uses
the default terminal color.
"""
for line in text.splitlines():
if not line:
continue
styled = _colorize_line(line)
self.write(styled)
def append_command(self, cmd: str) -> None:
"""Append a user-issued command, formatted with a prompt indicator."""
styled = Text()
styled.append("> ", style="#00d4aa bold")
styled.append(cmd, style="#00d4aa")
self.write(styled)
def _colorize_line(line: str) -> Text:
"""Parse a single line and return a Rich Text with colored prompt spans."""
result = Text()
last_end = 0
for match in _PROMPT_PATTERN.finditer(line):
start, end = match.span()
prompt_str = match.group(1)
color = _PROMPT_COLOR[prompt_str]
# Text before the prompt
if start > last_end:
result.append(line[last_end:start], style="#c8d0d8")
# The prompt itself
result.append(prompt_str, style=f"{color} bold")
last_end = end
# Remaining text after last prompt
if last_end < len(line):
result.append(line[last_end:], style="#c8d0d8")
return result

View File

@ -0,0 +1,90 @@
"""Signal gauge widget — horizontal RSSI bar with color-coded thresholds."""
from rich.text import Text
from textual.reactive import reactive
from textual.widgets import Static
# RSSI color thresholds (upper bound, color)
_THRESHOLDS: list[tuple[int, str]] = [
(500, "#2080d0"), # cold — noise floor
(1000, "#00b8c8"), # cool — weak signal
(2000, "#00e060"), # mid — usable
(3000, "#e8c020"), # warm — strong
(4096, "#e04040"), # hot — saturating
]
BAR_WIDTH = 40
MAX_RSSI = 4096
# Sub-character bar fragments for smooth rendering (8 levels per cell)
_BAR_CHARS = " ▏▎▍▌▋▊▉"
_FULL = "\u2588" # █
def _rssi_color(rssi: int) -> str:
"""Return the color string for a given RSSI value."""
for threshold, color in _THRESHOLDS:
if rssi <= threshold:
return color
return _THRESHOLDS[-1][1]
class SignalGauge(Static):
"""Horizontal RSSI signal strength bar gauge."""
rssi_avg: reactive[int] = reactive(0)
rssi_cur: reactive[int] = reactive(0)
reads: reactive[int] = reactive(0)
def render(self) -> Text:
result = Text()
# Title
result.append("RSSI", style="#506878 bold")
result.append("\n")
# Compute fill with sub-character precision (8 levels per cell = 320 positions)
clamped = max(0, min(self.rssi_cur, MAX_RSSI))
fill_frac = clamped / MAX_RSSI * BAR_WIDTH
full_cells = int(fill_frac)
partial = fill_frac - full_cells
partial_idx = int(partial * 8)
# Build the bar with per-character color based on position thresholds
for i in range(full_cells):
pos_rssi = round((i + 0.5) / BAR_WIDTH * MAX_RSSI)
color = _rssi_color(pos_rssi)
result.append(_FULL, style=color)
# Partial sub-character cell
remaining = BAR_WIDTH - full_cells
if remaining > 0 and partial_idx > 0:
pos_rssi = round((full_cells + 0.5) / BAR_WIDTH * MAX_RSSI)
color = _rssi_color(pos_rssi)
result.append(_BAR_CHARS[partial_idx], style=color)
remaining -= 1
result.append("\u2591" * remaining, style="#1a2a38")
# Numeric value at end of bar
result.append(f" {self.rssi_cur}", style=_rssi_color(self.rssi_cur))
result.append("\n")
# Label line
result.append("avg: ", style="#506878")
result.append(f"{self.rssi_avg}", style="#c8d0d8")
result.append(" cur: ", style="#506878")
result.append(f"{self.rssi_cur}", style="#c8d0d8")
result.append(" reads: ", style="#506878")
result.append(f"{self.reads}", style="#c8d0d8")
return result
def watch_rssi_avg(self, _value: int) -> None:
self.refresh()
def watch_rssi_cur(self, _value: int) -> None:
self.refresh()
def watch_reads(self, _value: int) -> None:
self.refresh()

View File

@ -0,0 +1,123 @@
"""Sky heatmap widget — 2D AZ x EL grid colored by RSSI for sky scan visualization."""
from rich.text import Text
from textual.widgets import Static
# RSSI color thresholds matching signal_gauge.py
_THRESHOLDS: list[tuple[float, str]] = [
(500.0, "#2080d0"),
(1000.0, "#00b8c8"),
(2000.0, "#00e060"),
(3000.0, "#e8c020"),
(4096.0, "#e04040"),
]
_ZERO_COLOR = "#0e1420"
def _rssi_color(rssi: float) -> str:
"""Return the color string for a given RSSI value."""
if rssi <= 0:
return _ZERO_COLOR
for threshold, color in _THRESHOLDS:
if rssi <= threshold:
return color
return _THRESHOLDS[-1][1]
class SkyHeatmap(Static):
"""2D azimuth x elevation grid colored by RSSI for sky scan visualization."""
def __init__(
self,
az_bins: int = 40,
el_bins: int = 10,
**kwargs,
) -> None:
super().__init__(**kwargs)
self._az_bins = az_bins
self._el_bins = el_bins
self._grid: list[list[float]] = [[0.0] * az_bins for _ in range(el_bins)]
self._active_az: int | None = None
self._active_el: int | None = None
def set_point(self, az_idx: int, el_idx: int, rssi: float) -> None:
"""Set RSSI value at a grid cell. Does not refresh — call refresh() explicitly
or batch updates and refresh once."""
if 0 <= el_idx < self._el_bins and 0 <= az_idx < self._az_bins:
self._grid[el_idx][az_idx] = rssi
def set_active(self, az_idx: int, el_idx: int) -> None:
"""Highlight the current scan position and refresh."""
self._active_az = az_idx
self._active_el = el_idx
self.refresh()
def clear(self) -> None:
"""Reset all RSSI values to zero and clear active position."""
for row in self._grid:
for i in range(len(row)):
row[i] = 0.0
self._active_az = None
self._active_el = None
self.refresh()
def render(self) -> Text:
result = Text()
# Column header: AZ labels (every 5 bins)
# Left gutter for EL labels
gutter = 5
result.append(" " * gutter, style="#0e1420")
for az in range(self._az_bins):
if az % 5 == 0:
label = str(az)
result.append(label, style="#506878")
# Pad to maintain 1-char-per-bin spacing
pad = 1 - len(label)
if pad > 0:
result.append(" " * pad)
else:
result.append(" ")
result.append("\n")
# Grid rows: highest EL at top
for el_idx in range(self._el_bins - 1, -1, -1):
# EL label
el_label = f"{el_idx:>3d} "
result.append(el_label, style="#506878")
result.append("\u2502", style="#1a2a38")
for az_idx in range(self._az_bins):
rssi = self._grid[el_idx][az_idx]
is_active = az_idx == self._active_az and el_idx == self._active_el
if is_active:
# Active scan position: bright white on dark background
result.append("\u2588", style="bold #ffffff on #1a2a38")
elif rssi <= 0:
# Empty cell
result.append("\u2591", style="#0e1420")
else:
color = _rssi_color(rssi)
# Use denser block for higher RSSI
ch = "\u2593" if rssi < 500 else "\u2588"
result.append(ch, style=color)
if el_idx > 0:
result.append("\n")
# Bottom border
result.append("\n")
result.append(" " * gutter, style="#0e1420")
result.append("\u2500" * self._az_bins, style="#1a2a38")
return result
@property
def az_bins(self) -> int:
return self._az_bins
@property
def el_bins(self) -> int:
return self._el_bins

View File

@ -0,0 +1,74 @@
"""Sparkline widget — rolling time series using Unicode block characters."""
from collections import deque
from rich.text import Text
from textual.widgets import Static
# 8-level vertical block characters for sparkline rendering.
# Index 0 = lowest bar, index 7 = tallest bar.
_BLOCKS = "\u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588"
class SparklineWidget(Static):
"""Rolling sparkline time series display."""
def __init__(
self,
max_points: int = 60,
label: str = "",
color: str = "#00d4aa",
**kwargs,
) -> None:
super().__init__(**kwargs)
self._max_points = max_points
self._label = label
self._color = color
self._buffer: deque[float] = deque(maxlen=max_points)
def push(self, value: float) -> None:
"""Add a data point to the sparkline buffer and refresh."""
self._buffer.append(value)
self.refresh()
def render(self) -> Text:
result = Text()
# Label prefix
if self._label:
result.append(f"{self._label} ", style="#506878")
if not self._buffer:
result.append("\u2581" * self._max_points, style="#1a2a38")
return result
values = list(self._buffer)
lo = min(values)
hi = max(values)
span = hi - lo
for v in values:
if span <= 0:
# All values identical — render as mid-level
idx = 3
else:
normalized = (v - lo) / span
idx = min(int(normalized * 7.999), 7)
result.append(_BLOCKS[idx], style=self._color)
# Pad remaining width with low blocks if buffer not full
remaining = self._max_points - len(values)
if remaining > 0:
result.append(_BLOCKS[0] * remaining, style="#1a2a38")
# Min/max annotation
result.append("\n")
if self._label:
result.append(" " * (len(self._label) + 1))
result.append(f"{lo:.0f}", style="#506878")
gap = self._max_points - len(f"{lo:.0f}") - len(f"{hi:.0f}")
if gap > 0:
result.append(" " * gap)
result.append(f"{hi:.0f}", style="#506878")
return result

179
tui/uv.lock generated Normal file
View File

@ -0,0 +1,179 @@
version = 1
revision = 3
requires-python = ">=3.11"
[[package]]
name = "birdcage"
version = "2026.2.12.1"
source = { directory = "../" }
dependencies = [
{ name = "click" },
{ name = "pyserial" },
]
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.0" },
{ name = "pyserial", specifier = ">=3.5" },
]
[[package]]
name = "birdcage-tui"
version = "2026.2.13"
source = { editable = "." }
dependencies = [
{ name = "birdcage" },
{ name = "textual" },
]
[package.metadata]
requires-dist = [
{ name = "birdcage", directory = "../" },
{ name = "textual", specifier = ">=1.0.0" },
]
[[package]]
name = "click"
version = "8.3.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3d/fa/656b739db8587d7b5dfa22e22ed02566950fbfbcdc20311993483657a5c0/click-8.3.1.tar.gz", hash = "sha256:12ff4785d337a1bb490bb7e9c2b1ee5da3112e94a8622f26a6c77f5d2fc6842a", size = 295065, upload-time = "2025-11-15T20:45:42.706Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/98/78/01c019cdb5d6498122777c1a43056ebb3ebfeef2076d9d026bfe15583b2b/click-8.3.1-py3-none-any.whl", hash = "sha256:981153a64e25f12d547d3426c367a4857371575ee7ad18df2a6183ab0545b2a6", size = 108274, upload-time = "2025-11-15T20:45:41.139Z" },
]
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "linkify-it-py"
version = "2.0.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "uc-micro-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/2a/ae/bb56c6828e4797ba5a4821eec7c43b8bf40f69cda4d4f5f8c8a2810ec96a/linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048", size = 27946, upload-time = "2024-02-04T14:48:04.179Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" },
]
[[package]]
name = "markdown-it-py"
version = "4.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
]
[package.optional-dependencies]
linkify = [
{ name = "linkify-it-py" },
]
[[package]]
name = "mdit-py-plugins"
version = "0.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" },
]
[[package]]
name = "platformdirs"
version = "4.7.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/71/25/ccd8e88fcd16a4eb6343a8b4b9635e6f3928a7ebcd82822a14d20e3ca29f/platformdirs-4.7.0.tar.gz", hash = "sha256:fd1a5f8599c85d49b9ac7d6e450bc2f1aaf4a23f1fe86d09952fe20ad365cf36", size = 23118, upload-time = "2026-02-12T22:21:53.764Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/e3/1eddccb2c39ecfbe09b3add42a04abcc3fa5b468aa4224998ffb8a7e9c8f/platformdirs-4.7.0-py3-none-any.whl", hash = "sha256:1ed8db354e344c5bb6039cd727f096af975194b508e37177719d562b2b540ee6", size = 18983, upload-time = "2026-02-12T22:21:52.237Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pyserial"
version = "3.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6cb66a5b57197850f919f59e558159a4dd3a818f5082/pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb", size = 159125, upload-time = "2020-11-23T03:59:15.045Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/bc/587a445451b253b285629263eb51c2d8e9bcea4fc97826266d186f96f558/pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0", size = 90585, upload-time = "2020-11-23T03:59:13.41Z" },
]
[[package]]
name = "rich"
version = "14.3.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/74/99/a4cab2acbb884f80e558b0771e97e21e939c5dfb460f488d19df485e8298/rich-14.3.2.tar.gz", hash = "sha256:e712f11c1a562a11843306f5ed999475f09ac31ffb64281f73ab29ffdda8b3b8", size = 230143, upload-time = "2026-02-01T16:20:47.908Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ef/45/615f5babd880b4bd7d405cc0dc348234c5ffb6ed1ea33e152ede08b2072d/rich-14.3.2-py3-none-any.whl", hash = "sha256:08e67c3e90884651da3239ea668222d19bea7b589149d8014a21c633420dbb69", size = 309963, upload-time = "2026-02-01T16:20:46.078Z" },
]
[[package]]
name = "textual"
version = "7.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py", extra = ["linkify"] },
{ name = "mdit-py-plugins" },
{ name = "platformdirs" },
{ name = "pygments" },
{ name = "rich" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9f/38/7d169a765993efde5095c70a668bf4f5831bb7ac099e932f2783e9b71abf/textual-7.5.0.tar.gz", hash = "sha256:c730cba1e3d704e8f1ca915b6a3af01451e3bca380114baacf6abf87e9dac8b6", size = 1592319, upload-time = "2026-01-30T13:46:39.881Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9c/78/96ddb99933e11d91bc6e05edae23d2687e44213066bcbaca338898c73c47/textual-7.5.0-py3-none-any.whl", hash = "sha256:849dfee9d705eab3b2d07b33152b7bd74fb1f5056e002873cc448bce500c6374", size = 718164, upload-time = "2026-01-30T13:46:37.635Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" },
]
[[package]]
name = "uc-micro-py"
version = "1.0.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/91/7a/146a99696aee0609e3712f2b44c6274566bc368dfe8375191278045186b8/uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a", size = 6043, upload-time = "2024-02-09T16:52:01.654Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/37/87/1f677586e8ac487e29672e4b17455758fce261de06a0d086167bb760361a/uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5", size = 6229, upload-time = "2024-02-09T16:52:00.371Z" },
]