From 145763fcfb70490635842869f9da61dfa9abbf9e Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sat, 14 Feb 2026 18:04:50 -0700 Subject: [PATCH] Rebuild TUI as 4-tab layout with 10 new widgets Replace sidebar + 5-screen layout with horizontal tab bar (F1-F4) and persistent StatusStrip. Consolidate Position + Scan screens into Signal (F3) with Monitor/Sweep/Sky Map sub-modes via ModeBar. Screens: - F1 Dashboard: system health, tracking panel, quick actions, presets - F2 Control: motor tuning, compass rose, preset management - F3 Signal: RSSI monitor, 1D sweep (SweepPlot), 2D sky map (heatmap) - F4 System: NVS editor with regex filter, EEPROM, firmware info - F5 Console: push/pop overlay (no longer a tab) New widgets: StatusStrip, ModeBar, SweepPlot, QuickActions, PresetList, ReceiverInfo, MotorTuning, NvsFilter, SystemHealth, TrackingPanel. Removed: PositionScreen, ScanScreen, DeviceStatusBar (functionality absorbed into new screens and StatusStrip). App-level position poll feeds StatusStrip and active screen at ~2 Hz. Fix shared threading.Event across instances (class-level mutable default). --- tui/pyproject.toml | 68 +-- tui/src/birdcage_tui/app.py | 252 ++++++++-- tui/src/birdcage_tui/screens/console.py | 98 ++-- tui/src/birdcage_tui/screens/control.py | 451 ++++++++++++++++++ tui/src/birdcage_tui/screens/dashboard.py | 153 ++++++ tui/src/birdcage_tui/screens/position.py | 286 ----------- tui/src/birdcage_tui/screens/scan.py | 272 ----------- tui/src/birdcage_tui/screens/system.py | 271 ++++++++++- tui/src/birdcage_tui/theme.tcss | 300 +++++++++--- tui/src/birdcage_tui/widgets/__init__.py | 22 +- .../birdcage_tui/widgets/device_status_bar.py | 92 ---- tui/src/birdcage_tui/widgets/mode_bar.py | 72 +++ tui/src/birdcage_tui/widgets/motor_tuning.py | 130 +++++ tui/src/birdcage_tui/widgets/nvs_filter.py | 72 +++ tui/src/birdcage_tui/widgets/preset_list.py | 216 +++++++++ tui/src/birdcage_tui/widgets/quick_actions.py | 48 ++ tui/src/birdcage_tui/widgets/receiver_info.py | 156 ++++++ tui/src/birdcage_tui/widgets/status_strip.py | 111 +++++ tui/src/birdcage_tui/widgets/sweep_plot.py | 157 ++++++ tui/src/birdcage_tui/widgets/system_health.py | 195 ++++++++ .../birdcage_tui/widgets/tracking_panel.py | 164 +++++++ tui/tests/test_sweep_stop.py | 3 +- tui/uv.lock | 72 ++- 23 files changed, 2796 insertions(+), 865 deletions(-) create mode 100644 tui/src/birdcage_tui/screens/control.py create mode 100644 tui/src/birdcage_tui/screens/dashboard.py delete mode 100644 tui/src/birdcage_tui/screens/position.py delete mode 100644 tui/src/birdcage_tui/screens/scan.py delete mode 100644 tui/src/birdcage_tui/widgets/device_status_bar.py create mode 100644 tui/src/birdcage_tui/widgets/mode_bar.py create mode 100644 tui/src/birdcage_tui/widgets/motor_tuning.py create mode 100644 tui/src/birdcage_tui/widgets/nvs_filter.py create mode 100644 tui/src/birdcage_tui/widgets/preset_list.py create mode 100644 tui/src/birdcage_tui/widgets/quick_actions.py create mode 100644 tui/src/birdcage_tui/widgets/receiver_info.py create mode 100644 tui/src/birdcage_tui/widgets/status_strip.py create mode 100644 tui/src/birdcage_tui/widgets/sweep_plot.py create mode 100644 tui/src/birdcage_tui/widgets/system_health.py create mode 100644 tui/src/birdcage_tui/widgets/tracking_panel.py diff --git a/tui/pyproject.toml b/tui/pyproject.toml index 85f6d7e..1e7dab8 100644 --- a/tui/pyproject.toml +++ b/tui/pyproject.toml @@ -1,31 +1,37 @@ -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[project] -name = "birdcage-tui" -version = "2026.02.13" -description = "Textual TUI for Winegard Carryout G2 satellite dish control" -license = "MIT" -requires-python = ">=3.11" -authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}] -dependencies = [ - "birdcage", - "textual>=1.0.0", -] - -[project.scripts] -birdcage-tui = "birdcage_tui.app:main" - -[tool.uv.sources] -birdcage = { path = ".." } - -[tool.ruff] -target-version = "py311" -src = ["src"] - -[tool.ruff.lint] -select = ["E", "F", "I", "UP", "B", "SIM"] - -[tool.hatch.build.targets.wheel] -packages = ["src/birdcage_tui"] +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "birdcage-tui" +version = "2026.02.13" +description = "Textual TUI for Winegard Carryout G2 satellite dish control" +license = "MIT" +requires-python = ">=3.11" +authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}] +dependencies = [ + "birdcage", + "textual>=1.0.0", +] + +[project.scripts] +birdcage-tui = "birdcage_tui.app:main" + +[tool.uv.sources] +birdcage = { path = "..", editable = true } + +[tool.ruff] +target-version = "py311" +src = ["src"] + +[tool.ruff.lint] +select = ["E", "F", "I", "UP", "B", "SIM"] + +[tool.hatch.build.targets.wheel] +packages = ["src/birdcage_tui"] + +[dependency-groups] +dev = [ + "pytest>=9.0.2", + "pytest-asyncio>=1.3.0", +] diff --git a/tui/src/birdcage_tui/app.py b/tui/src/birdcage_tui/app.py index 87f03f8..07a2650 100644 --- a/tui/src/birdcage_tui/app.py +++ b/tui/src/birdcage_tui/app.py @@ -1,33 +1,35 @@ """Birdcage TUI — main application shell. -ContentSwitcher-based layout with sidebar navigation (F1-F5), -device status bar, and five swappable screen panels. +Horizontal tab bar (F1-F4) with persistent StatusStrip, ContentSwitcher +for four task-oriented screens, and F5 console overlay. App-level position +polling feeds the StatusStrip regardless of which tab is active. """ import argparse +import contextlib import logging import threading +from textual import work from textual.app import App, ComposeResult from textual.binding import Binding -from textual.containers import Horizontal, Vertical -from textual.widgets import Button, ContentSwitcher, Footer, Header, Static +from textual.containers import Horizontal +from textual.widgets import Button, ContentSwitcher, Footer, Header -from birdcage_tui.screens.console import ConsoleScreen -from birdcage_tui.screens.position import PositionScreen -from birdcage_tui.screens.scan import ScanScreen +from birdcage_tui.screens.console import ConsoleOverlay +from birdcage_tui.screens.control import ControlScreen +from birdcage_tui.screens.dashboard import DashboardScreen from birdcage_tui.screens.signal import SignalScreen from birdcage_tui.screens.system import SystemScreen -from birdcage_tui.widgets.device_status_bar import DeviceStatusBar +from birdcage_tui.widgets.status_strip import StatusStrip log = logging.getLogger(__name__) -MODES: dict[str, tuple[str, type]] = { - "position": ("F1 Position", PositionScreen), - "signal": ("F2 Signal", SignalScreen), - "scan": ("F3 Scan", ScanScreen), +TABS: dict[str, tuple[str, type]] = { + "dashboard": ("F1 Dashboard", DashboardScreen), + "control": ("F2 Control", ControlScreen), + "signal": ("F3 Signal", SignalScreen), "system": ("F4 System", SystemScreen), - "console": ("F5 Console", ConsoleScreen), } @@ -38,11 +40,11 @@ class BirdcageApp(App): CSS_PATH = "theme.tcss" BINDINGS = [ - Binding("f1", "switch_mode('position')", "Position"), - Binding("f2", "switch_mode('signal')", "Signal"), - Binding("f3", "switch_mode('scan')", "Scan"), - Binding("f4", "switch_mode('system')", "System"), - Binding("f5", "switch_mode('console')", "Console"), + Binding("f1", "switch_tab('dashboard')", "Dashboard"), + Binding("f2", "switch_tab('control')", "Control"), + Binding("f3", "switch_tab('signal')", "Signal"), + Binding("f4", "switch_tab('system')", "System"), + Binding("f5", "toggle_console", "Console"), Binding("q", "quit", "Quit"), Binding("d", "toggle_dark", "Dark"), ] @@ -55,6 +57,13 @@ class BirdcageApp(App): device: object = None shutdown_event: threading.Event = threading.Event() + # App-level position cache (updated by background poll). + _current_az: float = 0.0 + _current_el: float = 0.0 + _prev_az: float = 0.0 + _prev_el: float = 0.0 + _console_visible: bool = False + @property def SUB_TITLE(self) -> str: # noqa: N802 if self.demo_mode: @@ -63,22 +72,25 @@ class BirdcageApp(App): def compose(self) -> ComposeResult: yield Header() - with Horizontal(id="main-area"): - with Vertical(id="sidebar"): - yield Static("\U0001f6f0\ufe0f Birdcage", classes="sidebar-title") - yield Static("Carryout G2", classes="sidebar-subtitle") - for mode_key, (label, _) in MODES.items(): - yield Button(label, id=f"btn-{mode_key}", classes="sidebar-btn") - yield DeviceStatusBar(id="device-status") - with ContentSwitcher(id="content-area", initial="position"): - for mode_key, (_, screen_cls) in MODES.items(): - yield screen_cls(id=mode_key) + yield StatusStrip(id="status-strip") + with Horizontal(id="tab-bar"): + for tab_key, (label, _) in TABS.items(): + yield Button(label, id=f"tab-{tab_key}", classes="tab-btn") + with ContentSwitcher(id="content-area", initial="dashboard"): + for tab_key, (_, screen_cls) in TABS.items(): + yield screen_cls(id=tab_key) yield Footer() def on_mount(self) -> None: - self.query_one("#btn-position").add_class("active") + # Fresh event per instance — class-level default is shared across instances. + self.shutdown_event = threading.Event() + self.query_one("#tab-dashboard").add_class("active") self._setup_device() + # ------------------------------------------------------------------ + # Device lifecycle + # ------------------------------------------------------------------ + def _setup_device(self) -> None: """Create device (demo or real) and hand it to each screen.""" if self.demo_mode: @@ -98,6 +110,9 @@ class BirdcageApp(App): self.run_worker(self._initialize_device, thread=True) self._distribute_device() + self._update_status_strip_connection() + self._install_console() + self._start_position_poll() async def _initialize_device(self) -> None: """Run device init in a worker thread (blocks on serial I/O).""" @@ -109,44 +124,176 @@ class BirdcageApp(App): def _distribute_device(self) -> None: """Pass the device reference to every screen that wants it.""" - for mode_key in MODES: - screen = self.query_one(f"#{mode_key}") + for tab_key in TABS: + screen = self.query_one(f"#{tab_key}") if hasattr(screen, "set_device"): screen.set_device(self.device) - status_bar = self.query_one("#device-status", DeviceStatusBar) - if hasattr(status_bar, "set_device"): - status_bar.set_device(self.device) + def _update_status_strip_connection(self) -> None: + """Set the status strip's connection info from current device.""" + strip = self.query_one("#status-strip", StatusStrip) + is_demo = type(self.device).__name__ == "DemoDevice" if self.device else False + strip.demo = is_demo or self.demo_mode + strip.connected = ( + self.device is not None + and getattr(self.device, "is_connected", False) + and not strip.demo + ) + strip.port = self.serial_port - def action_switch_mode(self, mode: str) -> None: - """Switch the content area to *mode* and update sidebar highlight.""" + def _install_console(self) -> None: + """Pre-install the console overlay so it persists across open/close.""" + self.install_screen(ConsoleOverlay(), name="console-overlay") + + # ------------------------------------------------------------------ + # App-level position poll + # ------------------------------------------------------------------ + + @work(thread=True, exclusive=True, group="app-position-poll") + def _start_position_poll(self) -> None: + """Poll device at ~2 Hz for position, update StatusStrip globally.""" + shutdown = self.shutdown_event + while not shutdown.is_set(): + if self.device is None: + shutdown.wait(0.5) + continue + + try: + pos = self.device.get_position() + az = pos["azimuth"] + el = pos["elevation"] + self._current_az = az + self._current_el = el + self.call_from_thread(self._update_position, az, el) + except Exception: + log.debug("App position poll failed", exc_info=True) + + shutdown.wait(0.5) + + def _update_position(self, az: float, el: float) -> None: + """Push position to StatusStrip and active screen.""" + strip = self.query_one("#status-strip", StatusStrip) + strip.azimuth = az + strip.elevation = el + + # Detect movement from position delta + delta = abs(az - self._prev_az) + abs(el - self._prev_el) + if delta > 0.05: + strip.motor_state = "MOVING" + else: + strip.motor_state = "IDLE" + self._prev_az = az + self._prev_el = el + + # Notify active screen switcher = self.query_one("#content-area", ContentSwitcher) - switcher.current = mode + if switcher.current: + try: + active = self.query_one(f"#{switcher.current}") + if hasattr(active, "on_position_update"): + active.on_position_update(az, el) + except Exception: + pass - for btn in self.query(".sidebar-btn"): + # ------------------------------------------------------------------ + # Tab switching + # ------------------------------------------------------------------ + + def action_switch_tab(self, tab: str) -> None: + """Switch the content area to *tab* and update tab bar highlight.""" + switcher = self.query_one("#content-area", ContentSwitcher) + switcher.current = tab + + for btn in self.query(".tab-btn"): btn.remove_class("active") - self.query_one(f"#btn-{mode}").add_class("active") + self.query_one(f"#tab-{tab}").add_class("active") - screen = self.query_one(f"#{mode}") + screen = self.query_one(f"#{tab}") if hasattr(screen, "on_show"): screen.on_show() + # ------------------------------------------------------------------ + # Console overlay + # ------------------------------------------------------------------ + + def action_toggle_console(self) -> None: + """Push or pop the console overlay.""" + if self._console_visible: + # Pop the console -- dismiss triggers the callback + try: + self.pop_screen() + except Exception: + self._console_visible = False + else: + self.push_screen("console-overlay", callback=self._on_console_dismissed) + self._console_visible = True + + def _on_console_dismissed(self, _result=None) -> None: + """Called when the console overlay is dismissed.""" + self._console_visible = False + + # ------------------------------------------------------------------ + # Tab bar button handling + # ------------------------------------------------------------------ + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + if button_id.startswith("tab-"): + tab = button_id.removeprefix("tab-") + if tab in TABS: + self.action_switch_tab(tab) + + # ------------------------------------------------------------------ + # QuickActions navigation (from Dashboard) + # ------------------------------------------------------------------ + + def on_quick_actions_action_selected(self, event) -> None: + """Handle navigation requests from the Dashboard's quick actions.""" + action = event.action + if action == "point": + self.action_switch_tab("control") + elif action == "monitor": + self.action_switch_tab("signal") + elif action == "scan": + self.action_switch_tab("signal") + # Tell the signal screen to switch to skymap mode + try: + signal_screen = self.query_one("#signal") + if hasattr(signal_screen, "switch_mode"): + signal_screen.switch_mode("skymap") + except Exception: + pass + elif action == "stow": + self._do_stow() + + def _do_stow(self) -> None: + """Move dish to stow position (0, 65).""" + if self.device is None: + self.notify("No device connected", severity="warning") + return + self.notify("Stowing dish to AZ=0 EL=65...", severity="information") + self._run_stow() + + @work(thread=True) + def _run_stow(self) -> None: + """Execute stow in a worker thread.""" + try: + self.device.move_to(0.0, 65.0) + self.call_from_thread(self.notify, "Stow command sent") + except Exception: + log.exception("Stow failed") + self.call_from_thread(self.notify, "Stow failed", severity="error") + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + def on_unmount(self) -> None: """Signal all worker threads to exit and disconnect the device.""" self.shutdown_event.set() if self.device and hasattr(self.device, "disconnect"): self.device.disconnect() - def action_toggle_dark(self) -> None: - self.dark = not self.dark - - def on_button_pressed(self, event: Button.Pressed) -> None: - button_id = event.button.id or "" - if button_id.startswith("btn-"): - mode = button_id.removeprefix("btn-") - if mode in MODES: - self.action_switch_mode(mode) - def main() -> None: parser = argparse.ArgumentParser( @@ -176,5 +323,6 @@ def main() -> None: pass finally: app.shutdown_event.set() - if app.device and hasattr(app.device, "disconnect"): - app.device.disconnect() + with contextlib.suppress(Exception): + if app.device and hasattr(app.device, "disconnect"): + app.device.disconnect() diff --git a/tui/src/birdcage_tui/screens/console.py b/tui/src/birdcage_tui/screens/console.py index 2c1e22d..2f4d213 100644 --- a/tui/src/birdcage_tui/screens/console.py +++ b/tui/src/birdcage_tui/screens/console.py @@ -1,12 +1,17 @@ -"""F5 Console Screen -- raw serial terminal with color-coded prompts -and command history.""" +"""Console overlay -- raw serial terminal as a slide-up ModalScreen. + +Pushed via F5, dismissed via Escape or F5 again. Preserves command +history across open/close cycles when installed via install_screen(). +""" import re from textual import work from textual.app import ComposeResult +from textual.binding import Binding from textual.containers import Container, Horizontal from textual.events import Key +from textual.screen import ModalScreen from textual.widgets import Button, Input, Static from birdcage_tui.widgets.serial_log import SerialLog @@ -27,8 +32,7 @@ _KNOWN_PROMPTS = [ "DIPSWITCH>", ] -# Pattern to detect NVS write commands: "nvs" ... "e " -# or just "e " when already in the NVS submenu. +# Pattern to detect NVS write commands. _NVS_WRITE_RE = re.compile(r"e\s+\d+\s+\S+") @@ -44,20 +48,29 @@ def _detect_prompt(text: str) -> str | None: return last_prompt -class ConsoleScreen(Container): - """F5: Raw serial console for direct firmware interaction.""" +class ConsoleOverlay(ModalScreen): + """F5: Raw serial console as a slide-up overlay. - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self._device: object | None = None + Slides up from the bottom of the terminal, taking ~50% of the viewport. + The active screen remains visible (dimmed) above. Dismissed by Escape or F5. + """ + + BINDINGS = [ + Binding("escape", "dismiss_overlay", "Close", priority=True), + Binding("f5", "dismiss_overlay", "Close", priority=True), + ] + + def __init__(self) -> None: + super().__init__() self._command_history: list[str] = [] self._history_idx: int = 0 self._cmd_count: int = 0 self._prompt_ctx: str = "TRK>" self._last_dangerous_cmd: str | None = None + self._welcomed = False def compose(self) -> ComposeResult: - with Container(classes="screen-container"): + with Container(id="console-overlay"): yield SerialLog(id="serial-log") with Horizontal(classes="console-context"): yield Static("Context: TRK>", id="console-context") @@ -67,29 +80,43 @@ class ConsoleScreen(Container): yield Input(placeholder="Enter command...", id="console-input") yield Button("Send", id="btn-send", variant="primary") - def set_device(self, device: object) -> None: - """Store the device reference and show a welcome message.""" - self._device = device + def on_mount(self) -> None: + """Show welcome message on first mount.""" + if not self._welcomed: + self._show_welcome() + self._welcomed = True + self.query_one("#console-input", Input).focus() + def on_screen_resume(self) -> None: + """Re-focus input when the overlay is re-opened.""" + self.query_one("#console-input", Input).focus() + + def _show_welcome(self) -> None: + """Display connection info in the serial log.""" serial_log = self.query_one("#serial-log", SerialLog) + device = self.app.device if hasattr(self.app, "device") else None - # Determine connection description. - if hasattr(device, "demo_mode") or type(device).__name__ == "DemoDevice": - mode_label = "DEMO" + if device is not None: + is_demo = type(device).__name__ == "DemoDevice" + mode_label = "DEMO" if is_demo else "Live" else: - mode_label = getattr(device, "firmware_name", "Live") + mode_label = "No device" - port = getattr(self.app, "serial_port", "/dev/ttyUSB0") if self.app else "---" + port = getattr(self.app, "serial_port", "---") serial_log.append_output("Birdcage Console -- type ? for help") serial_log.append_output(f"Connected to: {mode_label} / {port}") - def _check_dangerous(self, cmd: str) -> str | None: - """Return a warning message if the command is dangerous, or None if safe. + def action_dismiss_overlay(self) -> None: + """Close the console overlay.""" + self.dismiss() - If the same dangerous command is sent twice in a row, allow it through - (the user is insisting). - """ + # ------------------------------------------------------------------ + # Safety checks + # ------------------------------------------------------------------ + + def _check_dangerous(self, cmd: str) -> str | None: + """Return a warning if the command is dangerous, or None if safe.""" stripped = cmd.strip() lower = stripped.lower() @@ -121,8 +148,12 @@ class ConsoleScreen(Container): return warning + # ------------------------------------------------------------------ + # Command dispatch + # ------------------------------------------------------------------ + def _do_send(self, cmd_text: str) -> None: - """Validate and dispatch a command. Called on Enter or Send button.""" + """Validate and dispatch a command.""" cmd_text = cmd_text.strip() if not cmd_text: return @@ -141,23 +172,24 @@ class ConsoleScreen(Container): serial_log = self.query_one("#serial-log", SerialLog) serial_log.append_command(cmd_text) - # Clear input right away so the user can type while waiting. + # Clear input. self.query_one("#console-input", Input).value = "" - # Dispatch to worker thread (serial I/O blocks). + # Dispatch to worker thread. self._send_command(cmd_text) @work(thread=True) def _send_command(self, cmd: str) -> None: """Send the command over serial and update the UI with the response.""" - if self._device is None: + device = self.app.device if hasattr(self.app, "device") else None + if device is None: self.app.call_from_thread( self.notify, "No device connected", severity="error" ) return try: - response = self._device.send_raw(cmd) + response = device.send_raw(cmd) except Exception as exc: self.app.call_from_thread(self._on_response, f"ERROR: {exc}", cmd) return @@ -176,6 +208,15 @@ class ConsoleScreen(Container): ctx_label = self.query_one("#console-context", Static) ctx_label.update(f"Context: {self._prompt_ctx}") + # Update the app-level StatusStrip menu indicator + from birdcage_tui.widgets.status_strip import StatusStrip + + try: + strip = self.app.query_one("#status-strip", StatusStrip) + strip.fw_menu = self._prompt_ctx + except Exception: + pass + # Update command count. self._cmd_count += 1 count_label = self.query_one("#console-cmd-count", Static) @@ -200,7 +241,6 @@ class ConsoleScreen(Container): """Handle up/down arrow keys for command history navigation.""" cmd_input = self.query_one("#console-input", Input) - # Only respond when the input widget has focus. if not cmd_input.has_focus: return diff --git a/tui/src/birdcage_tui/screens/control.py b/tui/src/birdcage_tui/screens/control.py new file mode 100644 index 0000000..c8e32e5 --- /dev/null +++ b/tui/src/birdcage_tui/screens/control.py @@ -0,0 +1,451 @@ +"""F2 Control screen -- Point My Dish with Manual, Presets, and Track sub-modes. + +Three-mode layout driven by a ModeBar and ContentSwitcher. Manual mode provides +compass rose, motor status, sparklines, and move/home/engage controls. Presets +mode manages saved AZ/EL targets. Track mode wraps the rotctld server lifecycle. +""" + +import contextlib +import logging + +from textual import work +from textual.app import ComposeResult +from textual.binding import Binding +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import Button, ContentSwitcher, Input, Static +from textual.worker import Worker + +from birdcage_tui.widgets.compass_rose import CompassRose +from birdcage_tui.widgets.mode_bar import ModeBar +from birdcage_tui.widgets.motor_status import MotorStatus +from birdcage_tui.widgets.preset_list import PresetList +from birdcage_tui.widgets.sparkline_widget import SparklineWidget +from birdcage_tui.widgets.tracking_panel import TrackingPanel + +log = logging.getLogger(__name__) + + +class ControlScreen(Container): + """F2: Point My Dish -- Manual / Presets / Track sub-modes.""" + + can_focus = True + + BINDINGS = [ + Binding("left", "nudge_az(-1)", "AZ -1", show=False), + Binding("right", "nudge_az(1)", "AZ +1", show=False), + Binding("up", "nudge_el(1)", "EL +1", show=False), + Binding("down", "nudge_el(-1)", "EL -1", show=False), + Binding("h", "home_both", "Home Both", show=False), + Binding("e", "engage_motors", "Engage", show=False), + Binding("r", "release_motors", "Release", show=False), + ] + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._device: object = None + self._polling = False + self._engaged = False + self._poll_worker: Worker | None = None + self._last_az = 180.0 + self._last_el = 45.0 + + # ------------------------------------------------------------------ + # Compose + # ------------------------------------------------------------------ + + def compose(self) -> ComposeResult: + with Container(classes="screen-container"): + yield ModeBar( + modes={ + "manual": "Manual", + "presets": "Presets", + "track": "Track", + }, + initial="manual", + classes="mode-bar", + ) + with ContentSwitcher(id="control-modes", initial="manual"): + # -- Manual mode -- + with Container(id="manual"): + with Horizontal(classes="top-row"): + yield CompassRose(id="ctrl-compass") + with Vertical(classes="panel"): + yield Static("Motor Status", classes="panel-title") + yield MotorStatus(id="ctrl-motor-status") + with Vertical(): + yield SparklineWidget( + max_points=80, + label="AZ", + color="#00d4aa", + id="ctrl-az-spark", + ) + yield SparklineWidget( + max_points=80, + label="EL", + color="#00b8c8", + id="ctrl-el-spark", + ) + with Horizontal(classes="bottom-controls"): + yield Static("AZ ", classes="label") + yield Input( + placeholder="180.0", + id="ctrl-az-input", + type="number", + ) + yield Static(" EL ", classes="label") + yield Input( + placeholder="45.0", + id="ctrl-el-input", + type="number", + ) + yield Button("Move", id="btn-ctrl-move", variant="primary") + with Horizontal(classes="bottom-controls"): + yield Button("Home AZ", id="btn-ctrl-home-az") + yield Button("Home EL", id="btn-ctrl-home-el") + yield Button("E/R", id="btn-ctrl-engage") + + # -- Presets mode -- + with Container(id="presets"), Horizontal(classes="top-row"): + yield PresetList(id="ctrl-preset-list") + with Vertical(classes="panel"): + yield Static("Current Position", classes="panel-title") + yield CompassRose(id="ctrl-preset-compass") + + # -- Track mode -- + with Container(id="track"), Vertical(classes="panel"): + yield Static("Satellite Tracking", classes="panel-title") + yield TrackingPanel(id="ctrl-tracking-panel") + + # ------------------------------------------------------------------ + # Device lifecycle + # ------------------------------------------------------------------ + + def set_device(self, device: object) -> None: + """Store the device reference and start the data poll.""" + self._device = device + self._start_data_poll() + + def on_show(self) -> None: + """Resume polling when this screen becomes visible.""" + if self._device is not None and not self._polling: + self._start_data_poll() + + def on_unmount(self) -> None: + """Stop polling thread on teardown.""" + self._polling = False + + # ------------------------------------------------------------------ + # Mode switching + # ------------------------------------------------------------------ + + def on_mode_bar_mode_changed(self, event: ModeBar.ModeChanged) -> None: + """Switch the ContentSwitcher when the ModeBar selection changes.""" + switcher = self.query_one("#control-modes", ContentSwitcher) + switcher.current = event.mode + + def switch_mode(self, mode_key: str) -> None: + """Programmatically switch to a sub-mode (called by app.py).""" + switcher = self.query_one("#control-modes", ContentSwitcher) + switcher.current = mode_key + # Update ModeBar button highlight + mode_bar = self.query_one(ModeBar) + for btn in mode_bar.query(".mode-btn"): + btn.remove_class("active") + with contextlib.suppress(Exception): + mode_bar.query_one(f"#mode-{mode_key}").add_class("active") + + # ------------------------------------------------------------------ + # Position updates (called by app-level poll) + # ------------------------------------------------------------------ + + def on_position_update(self, az: float, el: float) -> None: + """Push position to compass roses, sparklines, and internal cache.""" + self._last_az = az + self._last_el = el + + # Manual mode compass + try: + compass = self.query_one("#ctrl-compass", CompassRose) + compass.azimuth = az + compass.elevation = el + except Exception: + pass + + # Presets mode compass + try: + preset_compass = self.query_one("#ctrl-preset-compass", CompassRose) + preset_compass.azimuth = az + preset_compass.elevation = el + except Exception: + pass + + # Sparklines + try: + self.query_one("#ctrl-az-spark", SparklineWidget).push(az) + self.query_one("#ctrl-el-spark", SparklineWidget).push(el) + except Exception: + pass + + # ------------------------------------------------------------------ + # Data poll worker (step positions + torque) + # ------------------------------------------------------------------ + + def _start_data_poll(self) -> None: + """Kick off the background data poll for motor status.""" + self._polling = True + self._poll_worker = self._do_data_poll() + + @work(thread=True, exclusive=True, group="control-data-poll") + def _do_data_poll(self) -> None: + """Poll step positions and torque at ~2 Hz while active.""" + shutdown = self.app.shutdown_event + while self._polling and not shutdown.is_set(): + if self._device is None: + shutdown.wait(0.5) + continue + + # Step positions + try: + steps = self._device.get_step_positions() + self.app.call_from_thread( + self._update_motor_steps, + steps["az_steps"], + steps["el_steps"], + ) + except Exception: + log.debug("Step position poll failed", exc_info=True) + + # Torque state from A3981 + try: + torque_resp = self._device.get_a3981_torque() + az_torque = "HIGH" if "AZ Torq:HIGH" in torque_resp else "LOW" + el_torque = "HIGH" if "EL Torq:HIGH" in torque_resp else "LOW" + self.app.call_from_thread(self._update_torque, az_torque, el_torque) + except Exception: + log.debug("Torque poll failed", exc_info=True) + + shutdown.wait(0.5) + + # ------------------------------------------------------------------ + # Thread-safe widget update callbacks + # ------------------------------------------------------------------ + + def _update_motor_steps(self, az_steps: int, el_steps: int) -> None: + try: + motor = self.query_one("#ctrl-motor-status", MotorStatus) + motor.az_steps = az_steps + motor.el_steps = el_steps + except Exception: + pass + + def _update_torque(self, az_torque: str, el_torque: str) -> None: + try: + motor = self.query_one("#ctrl-motor-status", MotorStatus) + motor.az_torque = az_torque + motor.el_torque = el_torque + except Exception: + pass + + def _update_engaged(self, engaged: bool) -> None: + try: + motor = self.query_one("#ctrl-motor-status", MotorStatus) + motor.engaged = engaged + except Exception: + pass + + # ------------------------------------------------------------------ + # Button handlers + # ------------------------------------------------------------------ + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + + if button_id == "btn-ctrl-move": + self._handle_move() + elif button_id == "btn-ctrl-home-az": + self._handle_home(0) + elif button_id == "btn-ctrl-home-el": + self._handle_home(1) + elif button_id == "btn-ctrl-engage": + self._handle_engage_toggle() + + def _handle_move(self) -> None: + """Read AZ/EL inputs and issue a move command.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + + az_input = self.query_one("#ctrl-az-input", Input) + el_input = self.query_one("#ctrl-el-input", Input) + + try: + az = float(az_input.value) if az_input.value.strip() else self._last_az + except ValueError: + self.app.notify("Invalid AZ value", severity="warning") + return + + try: + el = float(el_input.value) if el_input.value.strip() else self._last_el + except ValueError: + self.app.notify("Invalid EL value", severity="warning") + return + + self.app.notify(f"Moving to AZ={az:.1f} EL={el:.1f}", severity="information") + self._run_motor_command(self._device.move_to, az, el) + + def _handle_home(self, motor_id: int) -> None: + """Home a specific motor axis.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + axis = "AZ" if motor_id == 0 else "EL" + self.app.notify(f"Homing {axis}...", severity="information") + self._run_motor_command(self._device.home_motor, motor_id) + + def _handle_engage_toggle(self) -> None: + """Toggle motor engage/release state.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + + if self._engaged: + self._run_motor_command(self._device.release) + self._engaged = False + self._update_engaged(False) + self.app.notify("Motors released") + else: + self._run_motor_command(self._device.engage) + self._engaged = True + self._update_engaged(True) + self.app.notify("Motors engaged") + + # ------------------------------------------------------------------ + # Preset handlers + # ------------------------------------------------------------------ + + def on_preset_list_go_to_preset(self, event: PresetList.GoToPreset) -> None: + """Move dish to the selected preset's AZ/EL.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + + az = event.az if event.az is not None else self._last_az + el = event.el + self.app.notify( + f"Moving to preset AZ={az:.1f} EL={el:.1f}", + severity="information", + ) + self._run_motor_command(self._device.move_to, az, el) + + def on_preset_list_save_requested(self, _event: PresetList.SaveRequested) -> None: + """Save the current position as a new preset.""" + preset_list = self.query_one("#ctrl-preset-list", PresetList) + name_input = preset_list.query_one("#preset-name-input", Input) + name = name_input.value.strip() + + if not name: + self.app.notify("Enter a preset name first", severity="warning") + return + + preset_list.save_preset(name, self._last_az, self._last_el) + name_input.value = "" + self.app.notify( + f"Saved preset '{name}' at AZ={self._last_az:.1f} EL={self._last_el:.1f}" + ) + + # ------------------------------------------------------------------ + # Tracking handlers + # ------------------------------------------------------------------ + + def on_tracking_panel_start_requested( + self, event: TrackingPanel.StartRequested + ) -> None: + """Handle rotctld start request (placeholder for Phase 6).""" + log.info( + "Tracking start requested: %s:%d min_el=%.1f", + event.host, + event.port, + event.min_el, + ) + panel = self.query_one("#ctrl-tracking-panel", TrackingPanel) + panel.set_status(state="LISTENING") + self.app.notify( + f"Tracking server listening on {event.host}:{event.port}", + severity="information", + ) + + def on_tracking_panel_stop_requested( + self, _event: TrackingPanel.StopRequested + ) -> None: + """Handle rotctld stop request (placeholder for Phase 6).""" + log.info("Tracking stop requested") + panel = self.query_one("#ctrl-tracking-panel", TrackingPanel) + panel.set_status(state="STOPPED") + self.app.notify("Tracking server stopped") + + # ------------------------------------------------------------------ + # Key binding actions + # ------------------------------------------------------------------ + + def _is_manual_active(self) -> bool: + """Check if Manual mode is the currently visible sub-mode.""" + try: + switcher = self.query_one("#control-modes", ContentSwitcher) + return switcher.current == "manual" + except Exception: + return False + + def action_nudge_az(self, delta: int) -> None: + """Nudge azimuth by delta degrees.""" + if not self._is_manual_active() or self._device is None: + return + new_az = self._last_az + delta + self._run_motor_command(self._device.move_motor, 0, new_az) + + def action_nudge_el(self, delta: int) -> None: + """Nudge elevation by delta degrees.""" + if not self._is_manual_active() or self._device is None: + return + new_el = self._last_el + delta + self._run_motor_command(self._device.move_motor, 1, new_el) + + def action_home_both(self) -> None: + """Home both AZ and EL motors.""" + if not self._is_manual_active() or self._device is None: + return + self.app.notify("Homing AZ + EL...", severity="information") + self._run_motor_command(self._device.home_motor, 0) + self._run_motor_command(self._device.home_motor, 1) + + def action_engage_motors(self) -> None: + """Engage (energize) stepper motors.""" + if not self._is_manual_active() or self._device is None: + return + self._run_motor_command(self._device.engage) + self._engaged = True + self._update_engaged(True) + self.app.notify("Motors engaged") + + def action_release_motors(self) -> None: + """Release (de-energize) stepper motors.""" + if not self._is_manual_active() or self._device is None: + return + self._run_motor_command(self._device.release) + self._engaged = False + self._update_engaged(False) + self.app.notify("Motors released") + + # ------------------------------------------------------------------ + # Motor command worker + # ------------------------------------------------------------------ + + @work(thread=True, exclusive=False, group="motor-cmd") + def _run_motor_command(self, fn, *args) -> None: + """Execute a motor command in a worker thread.""" + try: + fn(*args) + except Exception: + log.exception("Motor command failed") + self.app.call_from_thread( + self.app.notify, "Motor command failed", severity="error" + ) diff --git a/tui/src/birdcage_tui/screens/dashboard.py b/tui/src/birdcage_tui/screens/dashboard.py new file mode 100644 index 0000000..f8a0bd4 --- /dev/null +++ b/tui/src/birdcage_tui/screens/dashboard.py @@ -0,0 +1,153 @@ +"""F1 Dashboard screen -- at-a-glance status and quick actions. + +Home base for the Birdcage TUI. Shows a compass rose with live AZ/EL +position, a signal gauge with RSSI sparkline, quick-action buttons for +common tasks, and a system health summary pulled from firmware on mount. +""" + +import logging + +from textual import work +from textual.app import ComposeResult +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import Static + +from birdcage_tui.widgets.compass_rose import CompassRose +from birdcage_tui.widgets.quick_actions import QuickActions +from birdcage_tui.widgets.signal_gauge import SignalGauge +from birdcage_tui.widgets.sparkline_widget import SparklineWidget +from birdcage_tui.widgets.system_health import SystemHealthPanel + +log = logging.getLogger(__name__) + +DeviceLike = object + + +class DashboardScreen(Container): + """F1: At-a-glance status overview with quick actions.""" + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._device: DeviceLike | None = None + self._health_loaded = False + + def compose(self) -> ComposeResult: + with Container(classes="screen-container"): + with Horizontal(classes="top-row"): + with Vertical(classes="panel"): + yield Static("Position", classes="panel-title") + yield CompassRose(id="dash-compass") + with Vertical(classes="panel"): + yield Static("Signal", classes="panel-title") + yield SignalGauge(id="dash-signal") + yield SparklineWidget( + max_points=80, + label="DVB RSSI", + color="#00d4aa", + id="dash-rssi-spark", + ) + yield QuickActions(classes="quick-actions") + with Vertical(classes="system-health"): + yield Static("System Health", classes="panel-title") + yield SystemHealthPanel(id="dash-health") + + # ------------------------------------------------------------------ + # Device lifecycle + # ------------------------------------------------------------------ + + def set_device(self, device: object) -> None: + """Store the device reference and kick off health data load.""" + self._device = device + self._load_health_data() + + def on_show(self) -> None: + """Load health data on first activation if not yet fetched.""" + if not self._health_loaded and self._device is not None: + self._load_health_data() + + # ------------------------------------------------------------------ + # Position updates (called by app-level 2 Hz poll) + # ------------------------------------------------------------------ + + def on_position_update(self, az: float, el: float) -> None: + """Push live AZ/EL into the compass rose.""" + compass = self.query_one("#dash-compass", CompassRose) + compass.azimuth = az + compass.elevation = el + + # ------------------------------------------------------------------ + # System health loader + # ------------------------------------------------------------------ + + @work(thread=True, exclusive=True, group="dash-health") + def _load_health_data(self) -> None: + """Fetch hardware diagnostics from firmware and populate the panel.""" + if self._device is None: + return + + shutdown = getattr(self.app, "shutdown_event", None) + if shutdown is not None and shutdown.is_set(): + return + + diag = "" + torque = "" + fw_id = "" + motor_life = "" + el_limits: dict[str, float] = {"min": 0.0, "max": 0.0, "home": 0.0} + + try: + diag = self._device.get_a3981_diag() + except Exception: + log.debug("Dashboard: failed to fetch A3981 diag", exc_info=True) + + try: + torque = self._device.get_a3981_torque() + except Exception: + log.debug("Dashboard: failed to fetch A3981 torque", exc_info=True) + + try: + fw_id = self._device.get_firmware_id() + except Exception: + log.debug("Dashboard: failed to fetch firmware ID", exc_info=True) + + try: + motor_life = self._device.get_motor_life() + except Exception: + log.debug("Dashboard: failed to fetch motor life", exc_info=True) + + try: + el_limits = self._device.get_el_limits() + except Exception: + log.debug("Dashboard: failed to fetch EL limits", exc_info=True) + + self._health_loaded = True + + try: + self.app.call_from_thread( + self._apply_health_data, + diag, + torque, + fw_id, + motor_life, + el_limits, + ) + except Exception: + log.debug("Dashboard: failed to push health data to UI", exc_info=True) + + def _apply_health_data( + self, + diag: str, + torque: str, + fw_id: str, + motor_life: str, + el_limits: dict[str, float], + ) -> None: + """Update the SystemHealthPanel on the main thread.""" + panel = self.query_one("#dash-health", SystemHealthPanel) + panel.load_data( + diag=diag, + torque=torque, + fw_id=fw_id, + motor_life=motor_life, + el_limits=el_limits, + ) diff --git a/tui/src/birdcage_tui/screens/position.py b/tui/src/birdcage_tui/screens/position.py deleted file mode 100644 index ca197b8..0000000 --- a/tui/src/birdcage_tui/screens/position.py +++ /dev/null @@ -1,286 +0,0 @@ -"""F1 Position screen -- AZ/EL display, manual moves, homing, engage/release. - -Widget container for ContentSwitcher. Polls the device at 2 Hz for -position and step data, drives the compass rose, motor status panel, -and AZ/EL sparklines. Bottom row provides manual move controls. -""" - -import logging - -from textual import work -from textual.app import ComposeResult -from textual.binding import Binding -from textual.containers import Container, Horizontal, Vertical -from textual.widgets import Button, Input, Static -from textual.worker import Worker - -from birdcage_tui.widgets.compass_rose import CompassRose -from birdcage_tui.widgets.motor_status import MotorStatus -from birdcage_tui.widgets.sparkline_widget import SparklineWidget - -log = logging.getLogger(__name__) - - -class PositionScreen(Container): - """F1: Position control and monitoring.""" - - can_focus = True - - BINDINGS = [ - Binding("left", "nudge_az(-1)", "AZ -1", show=False), - Binding("right", "nudge_az(1)", "AZ +1", show=False), - Binding("up", "nudge_el(1)", "EL +1", show=False), - Binding("down", "nudge_el(-1)", "EL -1", show=False), - Binding("h", "home_both", "Home Both", show=False), - Binding("e", "engage_motors", "Engage", show=False), - Binding("r", "release_motors", "Release", show=False), - ] - - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self._device: object = None - self._polling = False - self._engaged = False - self._poll_worker: Worker | None = None - # Track last-known position for nudge commands. - self._last_az = 180.0 - self._last_el = 45.0 - - def compose(self) -> ComposeResult: - with Container(classes="screen-container"): - with Horizontal(classes="top-row"): - yield CompassRose(id="compass") - with Vertical(classes="panel"): - yield Static("Motor Status", classes="panel-title") - yield MotorStatus(id="motor-status") - with Vertical(): - yield SparklineWidget( - max_points=80, label="AZ", color="#00d4aa", id="az-spark" - ) - yield SparklineWidget( - max_points=80, label="EL", color="#00b8c8", id="el-spark" - ) - with Horizontal(classes="bottom-controls"): - yield Static("AZ ", classes="label") - yield Input(placeholder="180.0", id="az-input", type="number") - yield Static(" EL ", classes="label") - yield Input(placeholder="45.0", id="el-input", type="number") - yield Button("Move", id="btn-move", variant="primary") - yield Button("Home AZ", id="btn-home-az") - yield Button("Home EL", id="btn-home-el") - yield Button("E/R", id="btn-engage") - - # ------------------------------------------------------------------ - # Device lifecycle - # ------------------------------------------------------------------ - - def set_device(self, device: object) -> None: - """Store the device reference and start position polling.""" - self._device = device - self._polling = True - self._poll_worker = self._do_position_poll() - - def on_show(self) -> None: - """Resume polling when this screen becomes visible.""" - if self._device is not None and not self._polling: - self._polling = True - self._poll_worker = self._do_position_poll() - - def on_unmount(self) -> None: - """Stop polling thread on teardown.""" - self._polling = False - - # ------------------------------------------------------------------ - # Position poll worker - # ------------------------------------------------------------------ - - @work(thread=True, exclusive=True, group="position-poll") - def _do_position_poll(self) -> None: - """Poll device at ~2 Hz for position and step data.""" - shutdown = self.app.shutdown_event - while self._polling and not shutdown.is_set(): - if self._device is None: - shutdown.wait(0.5) - continue - - try: - pos = self._device.get_position() - az = pos["azimuth"] - el = pos["elevation"] - self._last_az = az - self._last_el = el - - self.app.call_from_thread(self._update_compass, az, el) - self.app.call_from_thread(self._push_sparklines, az, el) - except Exception: - log.debug("Position poll failed", exc_info=True) - - try: - steps = self._device.get_step_positions() - self.app.call_from_thread( - self._update_motor_steps, - steps["az_steps"], - steps["el_steps"], - ) - except Exception: - log.debug("Step position poll failed", exc_info=True) - - # Poll torque state from A3981 - try: - torque_resp = self._device.get_a3981_torque() - lines = torque_resp.split("\n") - az_torque = "HIGH" if "HIGH" in lines[0] else "LOW" - el_torque = "HIGH" if len(lines) > 1 and "HIGH" in lines[1] else "LOW" - self.app.call_from_thread(self._update_torque, az_torque, el_torque) - except Exception: - log.debug("Torque poll failed", exc_info=True) - - shutdown.wait(0.5) - - # ------------------------------------------------------------------ - # Thread-safe widget update callbacks - # ------------------------------------------------------------------ - - def _update_compass(self, az: float, el: float) -> None: - compass = self.query_one("#compass", CompassRose) - compass.azimuth = az - compass.elevation = el - - def _push_sparklines(self, az: float, el: float) -> None: - self.query_one("#az-spark", SparklineWidget).push(az) - self.query_one("#el-spark", SparklineWidget).push(el) - - def _update_motor_steps(self, az_steps: int, el_steps: int) -> None: - motor = self.query_one("#motor-status", MotorStatus) - motor.az_steps = az_steps - motor.el_steps = el_steps - - def _update_torque(self, az_torque: str, el_torque: str) -> None: - motor = self.query_one("#motor-status", MotorStatus) - motor.az_torque = az_torque - motor.el_torque = el_torque - - def _update_engaged(self, engaged: bool) -> None: - motor = self.query_one("#motor-status", MotorStatus) - motor.engaged = engaged - - # ------------------------------------------------------------------ - # Button handlers - # ------------------------------------------------------------------ - - def on_button_pressed(self, event: Button.Pressed) -> None: - button_id = event.button.id or "" - - if button_id == "btn-move": - self._handle_move() - elif button_id == "btn-home-az": - self._handle_home(0) - elif button_id == "btn-home-el": - self._handle_home(1) - elif button_id == "btn-engage": - self._handle_engage_toggle() - - def _handle_move(self) -> None: - """Read AZ/EL inputs and issue a move command.""" - if self._device is None: - return - - az_input = self.query_one("#az-input", Input) - el_input = self.query_one("#el-input", Input) - - try: - az = float(az_input.value) if az_input.value.strip() else self._last_az - except ValueError: - self.app.notify("Invalid AZ value", severity="warning") - return - - try: - el = float(el_input.value) if el_input.value.strip() else self._last_el - except ValueError: - self.app.notify("Invalid EL value", severity="warning") - return - - self._run_motor_command(self._device.move_to, az, el) - - def _handle_home(self, motor_id: int) -> None: - """Home a specific motor axis.""" - if self._device is None: - return - axis = "AZ" if motor_id == 0 else "EL" - self.app.notify(f"Homing {axis}...", severity="information") - self._run_motor_command(self._device.home_motor, motor_id) - - def _handle_engage_toggle(self) -> None: - """Toggle motor engage/release state.""" - if self._device is None: - return - - if self._engaged: - self._run_motor_command(self._device.release) - self._engaged = False - self._update_engaged(False) - self.app.notify("Motors released") - else: - self._run_motor_command(self._device.engage) - self._engaged = True - self._update_engaged(True) - self.app.notify("Motors engaged") - - # ------------------------------------------------------------------ - # Key binding actions - # ------------------------------------------------------------------ - - def action_nudge_az(self, delta: int) -> None: - """Nudge azimuth by delta degrees.""" - if self._device is None: - return - new_az = self._last_az + delta - self._run_motor_command(self._device.move_motor, 0, new_az) - - def action_nudge_el(self, delta: int) -> None: - """Nudge elevation by delta degrees.""" - if self._device is None: - return - new_el = self._last_el + delta - self._run_motor_command(self._device.move_motor, 1, new_el) - - def action_home_both(self) -> None: - """Home both AZ and EL motors.""" - if self._device is None: - return - self.app.notify("Homing AZ + EL...", severity="information") - self._run_motor_command(self._device.home_motor, 0) - self._run_motor_command(self._device.home_motor, 1) - - def action_engage_motors(self) -> None: - """Engage (energize) stepper motors.""" - if self._device is None: - return - self._run_motor_command(self._device.engage) - self._engaged = True - self._update_engaged(True) - self.app.notify("Motors engaged") - - def action_release_motors(self) -> None: - """Release (de-energize) stepper motors.""" - if self._device is None: - return - self._run_motor_command(self._device.release) - self._engaged = False - self._update_engaged(False) - self.app.notify("Motors released") - - # ------------------------------------------------------------------ - # Helpers - # ------------------------------------------------------------------ - - @work(thread=True, exclusive=False, group="motor-cmd") - def _run_motor_command(self, fn, *args) -> None: - """Execute a motor command in a worker thread.""" - try: - fn(*args) - except Exception: - log.exception("Motor command failed") - self.app.call_from_thread( - self.app.notify, "Motor command failed", severity="error" - ) diff --git a/tui/src/birdcage_tui/screens/scan.py b/tui/src/birdcage_tui/screens/scan.py deleted file mode 100644 index 7be6fab..0000000 --- a/tui/src/birdcage_tui/screens/scan.py +++ /dev/null @@ -1,272 +0,0 @@ -"""F3 Scan Screen -- AZ sweep heatmap, sky mapping with configurable parameters. - -Grid-based sky scan: iterates over AZ/EL range, moves the dish to each point, -reads RSSI, and paints the result into a 2D heatmap. Supports CSV export of -raw (az, el, rssi) data for offline analysis. -""" - -import csv -import logging -from pathlib import Path - -from textual import work -from textual.containers import Container, Horizontal, Vertical -from textual.widgets import Button, Input, ProgressBar, Static -from textual.worker import get_current_worker - -from birdcage_tui.widgets.sky_heatmap import SkyHeatmap -from birdcage_tui.widgets.sparkline_widget import SparklineWidget - -log = logging.getLogger(__name__) - -# Type alias -- SerialBridge and DemoDevice share the same duck-typed interface. -DeviceLike = object - - -class ScanScreen(Container): - """F3: Sky scan and RF mapping.""" - - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self._device: DeviceLike | None = None - self._scanning = False - self._scan_data: list[tuple[float, float, float]] = [] - - def compose(self): - with Container(classes="screen-container"): - with Vertical(classes="panel"): - yield Static("Sky Scan", classes="panel-title") - yield SkyHeatmap(az_bins=40, el_bins=10, id="heatmap") - yield SparklineWidget( - max_points=80, label="Sweep RSSI", color="#00d4aa", id="sweep-spark" - ) - with Horizontal(classes="scan-status"): - yield Static("Idle", id="scan-status-text") - yield ProgressBar(id="scan-progress", total=100, show_eta=False) - with Horizontal(classes="bottom-controls"): - yield Static("AZ ", classes="label") - yield Input(value="160", id="az-start", type="number") - yield Static("-", classes="label") - yield Input(value="220", id="az-end", type="number") - yield Static(" Step ", classes="label") - yield Input(value="1.5", id="az-step", type="number") - yield Static(" EL ", classes="label") - yield Input(value="18", id="el-start", type="number") - yield Static("-", classes="label") - yield Input(value="65", id="el-end", type="number") - yield Static(" Step ", classes="label") - yield Input(value="5.0", id="el-step", type="number") - with Horizontal(classes="bottom-controls"): - yield Static("Transponders ", classes="label") - yield Input(value="3", id="xponder-input", type="integer") - yield Button("Start Scan", id="btn-start-scan", variant="primary") - yield Button("Stop", id="btn-stop-scan") - yield Button("Export CSV", id="btn-export") - - # ------------------------------------------------------------------ - # Device wiring - # ------------------------------------------------------------------ - - def set_device(self, device: DeviceLike) -> None: - """Store the device reference (SerialBridge or DemoDevice).""" - self._device = device - - # ------------------------------------------------------------------ - # Input helpers - # ------------------------------------------------------------------ - - def _read_float(self, widget_id: str, fallback: float) -> float: - """Read a float from an Input widget, returning *fallback* on parse error.""" - try: - return float(self.query_one(f"#{widget_id}", Input).value) - except (ValueError, TypeError): - return fallback - - def _read_int(self, widget_id: str, fallback: int) -> int: - """Read an int from an Input widget, returning *fallback* on parse error.""" - try: - return int(self.query_one(f"#{widget_id}", Input).value) - except (ValueError, TypeError): - return fallback - - # ------------------------------------------------------------------ - # Scan worker - # ------------------------------------------------------------------ - - @work(thread=True) - def _do_scan(self) -> None: - """Execute the AZ/EL grid scan in a background thread.""" - worker = get_current_worker() - shutdown = self.app.shutdown_event - device = self._device - if device is None: - return - - # Read scan parameters (widget access must happen via call_from_thread - # for Input.value, but Textual Input.value is a reactive that is safe - # to read from threads as a string snapshot). - az_start = self._read_float("az-start", 160.0) - az_end = self._read_float("az-end", 220.0) - az_step = self._read_float("az-step", 1.5) - el_start = self._read_float("el-start", 18.0) - el_end = self._read_float("el-end", 65.0) - el_step = self._read_float("el-step", 5.0) - iterations = self._read_int("xponder-input", 3) - - # Clamp step sizes to something sane. - if az_step <= 0: - az_step = 1.0 - if el_step <= 0: - el_step = 1.0 - - # Build the grid point list. - el_values: list[float] = [] - el = el_start - while el <= el_end + 1e-9: - el_values.append(round(el, 2)) - el += el_step - - az_values: list[float] = [] - az = az_start - while az <= az_end + 1e-9: - az_values.append(round(az, 2)) - az += az_step - - total_points = len(el_values) * len(az_values) - if total_points == 0: - self.app.call_from_thread( - self._set_status, "No grid points -- check parameters" - ) - return - - heatmap = self.query_one("#heatmap", SkyHeatmap) - spark = self.query_one("#sweep-spark", SparklineWidget) - - done = 0 - - for _el_idx, el_val in enumerate(el_values): - for _az_idx, az_val in enumerate(az_values): - if not self._scanning or worker.is_cancelled or shutdown.is_set(): - self.app.call_from_thread(self._set_status, "Scan stopped") - return - - # Move dish. - try: - device.move_to(az_val, el_val) - except Exception: - log.exception("move_to failed at AZ=%.2f EL=%.2f", az_val, el_val) - msg = f"Move error at AZ={az_val:.1f} EL={el_val:.1f}" - self.app.call_from_thread(self._set_status, msg) - continue - - # Settle time -- let the motor stop and vibrations damp. - shutdown.wait(0.3) - - # Read signal. - try: - rssi_data = device.get_rssi(iterations) - rssi = float(rssi_data.get("average", 0)) - except Exception: - log.exception("get_rssi failed at AZ=%.2f EL=%.2f", az_val, el_val) - rssi = 0.0 - - # Record raw data. - self._scan_data.append((az_val, el_val, rssi)) - - # Map to heatmap grid indices -- fit into fixed-size bins. - az_span = az_end - az_start + 1e-9 - el_span = el_end - el_start + 1e-9 - grid_az = min( - int((az_val - az_start) / az_span * heatmap.az_bins), - heatmap.az_bins - 1, - ) - grid_el = min( - int((el_val - el_start) / el_span * heatmap.el_bins), - heatmap.el_bins - 1, - ) - - # Update widgets. - self.app.call_from_thread(heatmap.set_point, grid_az, grid_el, rssi) - self.app.call_from_thread(heatmap.set_active, grid_az, grid_el) - self.app.call_from_thread(spark.push, rssi) - - done += 1 - pct = int(done * 100 / total_points) - status_text = ( - f"Scanning AZ={az_val:.1f} EL={el_val:.1f} " - f"RSSI={rssi:.0f} [{done}/{total_points}]" - ) - self.app.call_from_thread(self._set_progress, pct, status_text) - - msg = f"Scan complete -- {total_points} points" - self.app.call_from_thread(self._set_status, msg) - - # ------------------------------------------------------------------ - # Widget update helpers (called via call_from_thread) - # ------------------------------------------------------------------ - - def _set_status(self, text: str) -> None: - """Update the scan status text label.""" - self.query_one("#scan-status-text", Static).update(text) - - def _set_progress(self, pct: int, status_text: str) -> None: - """Update both progress bar and status text.""" - self.query_one("#scan-progress", ProgressBar).update(progress=pct) - self.query_one("#scan-status-text", Static).update(status_text) - - # ------------------------------------------------------------------ - # Button handlers - # ------------------------------------------------------------------ - - def on_button_pressed(self, event: Button.Pressed) -> None: - button_id = event.button.id or "" - - if button_id == "btn-start-scan": - self._start_scan() - elif button_id == "btn-stop-scan": - self._stop_scan() - elif button_id == "btn-export": - self._export_csv() - - def _start_scan(self) -> None: - """Clear state and kick off the scan worker.""" - if self._device is None: - self.app.notify("No device connected", severity="warning") - return - - if self._scanning: - self.app.notify("Scan already in progress", severity="warning") - return - - # Reset. - heatmap = self.query_one("#heatmap", SkyHeatmap) - heatmap.clear() - self._scan_data.clear() - self.query_one("#scan-progress", ProgressBar).update(progress=0) - self._set_status("Starting scan...") - - self._scanning = True - self._do_scan() - - def _stop_scan(self) -> None: - """Signal the scan worker to stop.""" - self._scanning = False - self._set_status("Stopping...") - - def _export_csv(self) -> None: - """Write scan data to /tmp/birdcage_scan.csv.""" - if not self._scan_data: - self.app.notify("No scan data to export", severity="warning") - return - - output = Path("/tmp/birdcage_scan.csv") - try: - with output.open("w", newline="") as fh: - writer = csv.writer(fh) - writer.writerow(["az", "el", "rssi"]) - for az, el, rssi in self._scan_data: - writer.writerow([f"{az:.2f}", f"{el:.2f}", f"{rssi:.1f}"]) - self.app.notify(f"Exported {len(self._scan_data)} points to {output}") - except OSError as exc: - log.exception("CSV export failed") - self.app.notify(f"Export failed: {exc}", severity="error") diff --git a/tui/src/birdcage_tui/screens/system.py b/tui/src/birdcage_tui/screens/system.py index 13719a0..31d20e8 100644 --- a/tui/src/birdcage_tui/screens/system.py +++ b/tui/src/birdcage_tui/screens/system.py @@ -1,8 +1,7 @@ -"""F4 System Screen -- NVS table, A3981 diagnostics, motor dynamics, firmware info. +"""F4 System Screen -- hardware info, motor tuning, NVS configuration. -Aggregates hardware identity, stepper driver status, motor tuning parameters, -and the full non-volatile storage table into a single dashboard panel. All data -is fetched from the device in a background worker thread and pushed to widgets +Three sub-modes via ModeBar: Hardware, Motors, NVS Config. All data is +fetched from the device in background worker threads and pushed to widgets via call_from_thread. """ @@ -13,10 +12,14 @@ from pathlib import Path from rich.text import Text from textual import work +from textual.app import ComposeResult from textual.containers import Container, Horizontal, Vertical -from textual.widgets import Button, Static +from textual.widgets import Button, ContentSwitcher, Static from textual.worker import get_current_worker +from birdcage_tui.widgets.mode_bar import ModeBar +from birdcage_tui.widgets.motor_tuning import MotorTuning +from birdcage_tui.widgets.nvs_filter import NvsFilter from birdcage_tui.widgets.nvs_table import NvsTable log = logging.getLogger(__name__) @@ -162,31 +165,70 @@ def _format_motor_dynamics( class SystemScreen(Container): - """F4: System information, NVS, and diagnostics.""" + """F4: System hardware, motor tuning, and NVS configuration.""" def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self._device: DeviceLike | None = None self._refreshed = False + self._nvs_raw: str = "" + self._nvs_parsed: list[dict[str, str]] = [] - def compose(self): + def compose(self) -> ComposeResult: with Container(classes="screen-container"): - with Horizontal(classes="panel"): - yield Static("", id="firmware-info") - with Horizontal(classes="top-row"): - with Vertical(classes="panel"): - yield Static("A3981 Diagnostics", classes="panel-title") - yield Static("", id="a3981-diag") - with Vertical(classes="panel"): - yield Static("Motor Dynamics", classes="panel-title") - yield Static("", id="motor-dynamics") - with Vertical(classes="panel"): - yield Static("NVS Table", classes="panel-title") - yield NvsTable(id="nvs-table") - with Horizontal(classes="bottom-controls"): - yield Button("Refresh All", id="btn-refresh-all", variant="primary") - yield Button("Refresh NVS", id="btn-refresh-nvs") - yield Button("Export NVS JSON", id="btn-export-nvs") + yield ModeBar( + modes={ + "hardware": "Hardware", + "motors": "Motors", + "nvs": "NVS Config", + }, + initial="hardware", + classes="mode-bar", + ) + with ContentSwitcher(id="system-modes", initial="hardware"): + # -- Hardware sub-mode -- + with Container(id="hardware"): + with Vertical(classes="panel"): + yield Static("Firmware", classes="panel-title") + yield Static("", id="firmware-info") + with Vertical(classes="panel"): + yield Static("A3981 Stepper Drivers", classes="panel-title") + yield Static("", id="a3981-diag") + with Horizontal(classes="bottom-controls"): + yield Button( + "Refresh All", + id="btn-refresh-all", + variant="primary", + ) + yield Button("Reset A3981 Faults", id="btn-reset-a3981") + + # -- Motors sub-mode -- + with Container(id="motors"): + with Vertical(classes="panel"): + yield Static("Motor Dynamics", classes="panel-title") + yield Static("", id="motor-dynamics") + yield MotorTuning(id="motor-tuning", classes="motor-tuning") + + # -- NVS Config sub-mode -- + with Container(id="nvs"): + yield NvsFilter(id="nvs-filter", classes="nvs-filter") + with Vertical(classes="panel"): + yield NvsTable(id="nvs-table") + with Horizontal(classes="bottom-controls"): + yield Button( + "Refresh NVS", + id="btn-refresh-nvs", + variant="primary", + ) + yield Button("Export NVS JSON", id="btn-export-nvs") + + # ------------------------------------------------------------------ + # Mode switching + # ------------------------------------------------------------------ + + def on_mode_bar_mode_changed(self, event: ModeBar.ModeChanged) -> None: + """Switch the system sub-mode ContentSwitcher.""" + self.query_one("#system-modes", ContentSwitcher).current = event.mode # ------------------------------------------------------------------ # Device wiring @@ -208,8 +250,11 @@ class SystemScreen(Container): if self._device is not None and not self._refreshed: self._do_system_refresh() + def on_position_update(self, az: float, el: float) -> None: + """System screen does not need position updates.""" + # ------------------------------------------------------------------ - # System refresh worker + # System refresh worker (all panels) # ------------------------------------------------------------------ @work(thread=True) @@ -271,14 +316,34 @@ class SystemScreen(Container): Text("Error reading motor dynamics", style="#e04040"), ) + # 3b. PID gains (populate the tuning widget). + try: + gains = device.get_pid_gains() + az_g = gains.get("az", {}) + el_g = gains.get("el", {}) + self.app.call_from_thread( + self._apply_pid_gains, + az_g.get("kp", 600), + az_g.get("kv", 60), + az_g.get("ki", 1), + el_g.get("kp", 250), + el_g.get("kv", 50), + el_g.get("ki", 1), + ) + except Exception: + log.debug("PID gain read failed", exc_info=True) + if worker.is_cancelled: return # 4. NVS dump. try: nvs_text = device.nvs_dump() + self._nvs_raw = nvs_text nvs_table = self.query_one("#nvs-table", NvsTable) self.app.call_from_thread(nvs_table.load_nvs, nvs_text) + # Cache parsed rows for filtering. + self.app.call_from_thread(self._cache_nvs_rows) except Exception: log.exception("Failed to dump NVS") self.app.call_from_thread( @@ -287,6 +352,24 @@ class SystemScreen(Container): self._refreshed = True + def _apply_pid_gains( + self, + az_kp: float, + az_kv: float, + az_ki: float, + el_kp: float, + el_kv: float, + el_ki: float, + ) -> None: + """Push device-reported PID gains into the MotorTuning widget inputs.""" + tuning = self.query_one("#motor-tuning", MotorTuning) + tuning.load_gains(az_kp, az_kv, az_ki, el_kp, el_kv, el_ki) + + def _cache_nvs_rows(self) -> None: + """Snapshot parsed NVS rows from the table for filter operations.""" + nvs_table = self.query_one("#nvs-table", NvsTable) + self._nvs_parsed = nvs_table.parsed_rows + # ------------------------------------------------------------------ # NVS-only refresh worker # ------------------------------------------------------------------ @@ -300,8 +383,10 @@ class SystemScreen(Container): try: nvs_text = device.nvs_dump() + self._nvs_raw = nvs_text nvs_table = self.query_one("#nvs-table", NvsTable) self.app.call_from_thread(nvs_table.load_nvs, nvs_text) + self.app.call_from_thread(self._cache_nvs_rows) self.app.call_from_thread(self.app.notify, "NVS table refreshed") except Exception: log.exception("Failed to refresh NVS") @@ -309,6 +394,78 @@ class SystemScreen(Container): self.app.notify, "NVS refresh failed", severity="error" ) + # ------------------------------------------------------------------ + # A3981 fault reset worker + # ------------------------------------------------------------------ + + @work(thread=True) + def _do_a3981_reset(self) -> None: + """Reset A3981 fault flags via the A3981 submenu.""" + device = self._device + if device is None: + return + + try: + device.send_raw("a3981") + device.send_raw("reset") + device.send_raw("q") + self.app.call_from_thread(self.app.notify, "A3981 faults reset") + # Re-read diagnostics after reset. + try: + diag = device.get_a3981_diag() + modes = device.get_a3981_modes() + torque = device.get_a3981_torque() + a3981_text = _format_a3981(diag, modes, torque) + self.app.call_from_thread( + self.query_one("#a3981-diag", Static).update, a3981_text + ) + except Exception: + log.debug("A3981 re-read after reset failed", exc_info=True) + except Exception: + log.exception("A3981 fault reset failed") + self.app.call_from_thread( + self.app.notify, "A3981 reset failed", severity="error" + ) + + # ------------------------------------------------------------------ + # NVS filter handler + # ------------------------------------------------------------------ + + def on_nvs_filter_filter_changed(self, event: NvsFilter.FilterChanged) -> None: + """Re-filter the NVS table based on search text and modified toggle.""" + self._apply_nvs_filter(event.text, event.modified_only) + + def _apply_nvs_filter(self, text: str, modified_only: bool) -> None: + """Clear the NVS DataTable and re-add only matching rows.""" + nvs_table = self.query_one("#nvs-table", NvsTable) + nvs_table.clear() + + needle = text.strip().lower() + + for row in self._nvs_parsed: + # Modified-only filter: skip rows where current == default. + if modified_only and row["current"] == row["default"]: + continue + + # Text filter: match against index or name (case-insensitive). + if needle: + idx_match = needle in row["idx"].lower() + name_match = needle in row["name"].lower() + if not (idx_match or name_match): + continue + + # Row passes all filters -- add it to the table. + modified = row["current"] != row["default"] + label = f"*{row['idx']}" if modified else row["idx"] + nvs_table.add_row( + label, + row["name"], + row["current"], + row["saved"], + row["default"], + key=f"nvs-{row['idx']}", + ) + # ------------------------------------------------------------------ # Button handlers # ------------------------------------------------------------------ @@ -322,12 +479,15 @@ class SystemScreen(Container): self._handle_refresh_nvs() elif button_id == "btn-export-nvs": self._export_nvs_json() + elif button_id == "btn-reset-a3981": + self._handle_reset_a3981() def _handle_refresh_all(self) -> None: """Kick off a full system refresh.""" if self._device is None: self.app.notify("No device connected", severity="warning") return + self._refreshed = False self._do_system_refresh() def _handle_refresh_nvs(self) -> None: @@ -337,10 +497,16 @@ class SystemScreen(Container): return self._do_nvs_refresh() + def _handle_reset_a3981(self) -> None: + """Reset A3981 stepper driver fault flags.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + self._do_a3981_reset() + def _export_nvs_json(self) -> None: """Export parsed NVS rows to /tmp/birdcage_nvs.json.""" - nvs_table = self.query_one("#nvs-table", NvsTable) - rows = nvs_table.parsed_rows + rows = self._nvs_parsed if not rows: self.app.notify( @@ -356,3 +522,56 @@ class SystemScreen(Container): except OSError as exc: log.exception("NVS JSON export failed") self.app.notify(f"Export failed: {exc}", severity="error") + + # ------------------------------------------------------------------ + # Motor tuning handler + # ------------------------------------------------------------------ + + def on_motor_tuning_apply_requested( + self, event: MotorTuning.ApplyRequested + ) -> None: + """Handle PID gain apply request from the MotorTuning widget.""" + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + log.info( + "PID apply requested: AZ(%.0f/%.0f/%.0f) EL(%.0f/%.0f/%.0f)", + event.az_kp, + event.az_kv, + event.az_ki, + event.el_kp, + event.el_kv, + event.el_ki, + ) + self._do_pid_write( + event.az_kp, + event.az_kv, + event.az_ki, + event.el_kp, + event.el_kv, + event.el_ki, + ) + + @work(thread=True) + def _do_pid_write( + self, + az_kp: float, + az_kv: float, + az_ki: float, + el_kp: float, + el_kv: float, + el_ki: float, + ) -> None: + """Write PID gains to both motor axes in a worker thread.""" + device = self._device + if device is None: + return + try: + device.set_pid_gains(0, az_kp, az_kv, az_ki) + device.set_pid_gains(1, el_kp, el_kv, el_ki) + self.app.call_from_thread(self.app.notify, "PID gains applied to AZ + EL") + except Exception: + log.exception("PID write failed") + self.app.call_from_thread( + self.app.notify, "PID write failed", severity="error" + ) diff --git a/tui/src/birdcage_tui/theme.tcss b/tui/src/birdcage_tui/theme.tcss index f8f351d..3f32a6a 100644 --- a/tui/src/birdcage_tui/theme.tcss +++ b/tui/src/birdcage_tui/theme.tcss @@ -1,6 +1,8 @@ /* Birdcage TUI — Dark RF Theme * Teal accent on deep blue-black. No purple. * Signal gradient: blue > cyan > green > yellow > red + * + * Layout: Header / StatusStrip / TabBar / ContentSwitcher / Footer */ /* ── Global ────────────────────────────────────────── */ @@ -25,49 +27,31 @@ Footer { height: 1; } -/* ── Layout Containers ─────────────────────────────── */ +/* ── Status Strip ─────────────────────────────────── */ -#main-area { - layout: horizontal; - height: 1fr; -} - -#sidebar { - width: 26; +#status-strip { + dock: top; + height: 1; background: #0e1420; - border-right: solid #1a2a3a; - padding: 1 1; + color: #c8d0d8; + border-bottom: solid #1a2a3a; + padding: 0 1; } -.sidebar-title { - color: #00d4aa; - text-style: bold; - text-align: center; - width: 100%; -} +/* ── Tab Bar ──────────────────────────────────────── */ -.sidebar-subtitle { - color: #506878; - text-align: center; - width: 100%; - margin: 0 0 1 0; -} - -#content-area { - width: 1fr; -} - -ContentSwitcher { - width: 1fr; - height: 1fr; -} - -/* ── Sidebar Buttons ───────────────────────────────── */ - -.sidebar-btn { - width: 100%; +#tab-bar { + dock: top; height: 3; - margin: 0 0 1 0; + background: #0a0a12; + padding: 0 1; + layout: horizontal; +} + +.tab-btn { + min-width: 16; + height: 3; + margin: 0 0 0 0; background: #121c2a; color: #7090a8; text-style: bold; @@ -75,19 +59,84 @@ ContentSwitcher { text-align: center; } -.sidebar-btn:hover { +.tab-btn:hover { background: #1a2a40; color: #00d4aa; border: round #00d4aa; } -.sidebar-btn.active { +.tab-btn.active { background: #0a2a3a; color: #00d4aa; border: round #00d4aa; text-style: bold; } +/* ── Mode Bar (within-screen sub-mode toggle) ─────── */ + +.mode-bar { + height: 3; + background: #0a0a12; + padding: 0 1; + layout: horizontal; + dock: top; +} + +.mode-btn { + min-width: 14; + height: 3; + margin: 0 0 0 0; + background: #0e1420; + color: #506878; + text-style: bold; + border: round #1a2a3a; + text-align: center; +} + +.mode-btn:hover { + background: #1a2a40; + color: #00d4aa; + border: round #00d4aa; +} + +.mode-btn.active { + background: #0a2a3a; + color: #00d4aa; + border: round #00d4aa; + text-style: bold; +} + +/* ── Content Area ─────────────────────────────────── */ + +#content-area { + width: 1fr; + height: 1fr; +} + +ContentSwitcher { + width: 1fr; + height: 1fr; +} + +/* ── Console Overlay (ModalScreen) ────────────────── */ + +ConsoleOverlay { + background: rgba(10, 10, 18, 0.6); +} + +#console-overlay { + dock: bottom; + height: 50%; + width: 100%; + background: #0a0a12; + border-top: double #00d4aa; + padding: 0; +} + +#console-overlay #serial-log { + height: 1fr; +} + /* ── Panel / Card ──────────────────────────────────── */ .panel { @@ -177,6 +226,18 @@ Button.-active { background: #0a3a3a; } +/* ── Checkbox ──────────────────────────────────────── */ + +Checkbox { + background: transparent; + color: #c8d0d8; + padding: 0 1; +} + +Checkbox:focus { + color: #00d4aa; +} + /* ── DataTable ─────────────────────────────────────── */ DataTable { @@ -338,44 +399,23 @@ ProgressBar PercentageStatus { color: #c8d0d8; } -/* ── Device Status Bar (sidebar bottom) ────────────── */ - -#device-status { - dock: bottom; - height: auto; - padding: 1; - background: #0e1420; - border-top: solid #1a2a38; -} - -.device-status-label { - color: #506878; -} - -.device-status-value { - color: #c8d0d8; -} - -.device-connected { - color: #00e060; -} - -.device-demo { - color: #e8a020; -} - -/* ── Console Screen ────────────────────────────────── */ +/* ── Console Input Area ───────────────────────────── */ .console-input-area { dock: bottom; - height: 3; + height: auto; + width: 100%; layout: horizontal; padding: 0 1; background: #0e1420; border-top: solid #1a2a38; } -.console-input-area Input { +.console-input-area .label { + width: auto; +} + +#console-input { width: 1fr; } @@ -426,7 +466,125 @@ ProgressBar PercentageStatus { color: #506878; } -/* ── Scan Screen ───────────────────────────────────── */ +/* ── Quick Actions (Dashboard) ────────────────────── */ + +.quick-actions { + height: auto; + layout: horizontal; + padding: 0 1; +} + +.quick-action-btn { + min-width: 16; + height: 3; + margin: 0 1 0 0; + background: #1a2a40; + color: #00d4aa; + border: round #1a3050; + text-style: bold; +} + +.quick-action-btn:hover { + background: #00d4aa; + color: #0a0a12; + border: round #00d4aa; +} + +/* ── System Health (Dashboard) ────────────────────── */ + +.system-health { + height: auto; + padding: 1 2; + background: #0e1420; + border: round #1a2a3a; + margin: 0 1; +} + +/* ── Preset List ──────────────────────────────────── */ + +.preset-controls { + height: auto; + layout: horizontal; + padding: 0 1; + dock: bottom; +} + +.preset-controls Button { + margin-right: 1; +} + +/* ── Tracking Panel ───────────────────────────────── */ + +.tracking-panel { + height: auto; + padding: 1 2; + background: #0e1420; + border: round #1a2a3a; +} + +.tracking-status { + color: #506878; +} + +/* ── Receiver Info ────────────────────────────────── */ + +.receiver-info { + height: auto; + padding: 1 2; + background: #0e1420; + border: round #1a2a3a; +} + +/* ── Sweep Plot ───────────────────────────────────── */ + +.sweep-plot { + height: auto; + min-height: 8; + padding: 1 2; + background: #0e1420; + border: round #1a2a3a; +} + +/* ── NVS Filter ───────────────────────────────────── */ + +.nvs-filter { + height: auto; + layout: horizontal; + padding: 0 1; + dock: top; + background: #0e1420; + border-bottom: solid #1a2a3a; +} + +.nvs-filter Input { + width: 1fr; + margin-right: 1; +} + +/* ── Motor Tuning (PID editor) ────────────────────── */ + +.motor-tuning { + height: auto; + padding: 1 2; + background: #0e1420; + border: round #1a2a3a; +} + +.pid-row { + layout: horizontal; + height: 3; +} + +.pid-row .label { + width: 4; +} + +.pid-row Input { + width: 8; + margin-right: 1; +} + +/* ── Scan Controls ────────────────────────────────── */ .scan-controls { dock: bottom; @@ -446,8 +604,8 @@ ProgressBar PercentageStatus { .screen-container { layout: vertical; - height: 1fr; - width: 1fr; + height: 100%; + width: 100%; } .top-row { diff --git a/tui/src/birdcage_tui/widgets/__init__.py b/tui/src/birdcage_tui/widgets/__init__.py index ea4216e..5e7dcf0 100644 --- a/tui/src/birdcage_tui/widgets/__init__.py +++ b/tui/src/birdcage_tui/widgets/__init__.py @@ -1,21 +1,39 @@ """Custom widgets for the Birdcage TUI.""" from birdcage_tui.widgets.compass_rose import CompassRose -from birdcage_tui.widgets.device_status_bar import DeviceStatusBar +from birdcage_tui.widgets.mode_bar import ModeBar from birdcage_tui.widgets.motor_status import MotorStatus +from birdcage_tui.widgets.motor_tuning import MotorTuning +from birdcage_tui.widgets.nvs_filter import NvsFilter from birdcage_tui.widgets.nvs_table import NvsTable +from birdcage_tui.widgets.preset_list import PresetList +from birdcage_tui.widgets.quick_actions import QuickActions +from birdcage_tui.widgets.receiver_info import ReceiverInfo from birdcage_tui.widgets.serial_log import SerialLog from birdcage_tui.widgets.signal_gauge import SignalGauge from birdcage_tui.widgets.sky_heatmap import SkyHeatmap from birdcage_tui.widgets.sparkline_widget import SparklineWidget +from birdcage_tui.widgets.status_strip import StatusStrip +from birdcage_tui.widgets.sweep_plot import SweepPlot +from birdcage_tui.widgets.system_health import SystemHealthPanel +from birdcage_tui.widgets.tracking_panel import TrackingPanel __all__ = [ "CompassRose", - "DeviceStatusBar", + "ModeBar", "MotorStatus", + "MotorTuning", + "NvsFilter", "NvsTable", + "PresetList", + "QuickActions", + "ReceiverInfo", "SerialLog", "SignalGauge", "SkyHeatmap", "SparklineWidget", + "StatusStrip", + "SweepPlot", + "SystemHealthPanel", + "TrackingPanel", ] diff --git a/tui/src/birdcage_tui/widgets/device_status_bar.py b/tui/src/birdcage_tui/widgets/device_status_bar.py deleted file mode 100644 index 7f72c29..0000000 --- a/tui/src/birdcage_tui/widgets/device_status_bar.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Device status bar widget — sidebar display of connection state and firmware info.""" - -from rich.text import Text -from textual.widgets import Static - - -class DeviceStatusBar(Static): - """Sidebar status display showing connection state and firmware info.""" - - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self._connected: bool = False - self._demo: bool = False - self._firmware: str = "---" - self._submenu: str = "---" - self._port: str = "---" - - def set_device(self, device: object) -> None: - """Accept a device reference and update the status display.""" - is_demo = hasattr(device, "demo_mode") or type(device).__name__ == "DemoDevice" - fw = getattr(device, "firmware_id", "02.02.48") if device else "---" - port = getattr(self.app, "serial_port", "/dev/ttyUSB0") if self.app else "---" - submenu = getattr(device, "current_menu", "TRK>") if device else "---" - connected = device is not None and not is_demo - self.update_status( - connected=connected, - demo=is_demo, - firmware=str(fw), - submenu=str(submenu), - port=str(port), - ) - - def update_status( - self, - connected: bool, - demo: bool, - firmware: str, - submenu: str, - port: str, - ) -> None: - """Update all status fields and refresh the display.""" - self._connected = connected - self._demo = demo - self._firmware = firmware - self._submenu = submenu - self._port = port - self.refresh() - - def render(self) -> Text: - result = Text() - label_w = 8 - - # Status row - result.append("Status".ljust(label_w), style="#506878") - if self._connected: - result.append("Connected", style="#00e060 bold") - elif self._demo: - result.append("Demo", style="#e8a020 italic") - else: - result.append("Offline", style="#e04040") - result.append("\n") - - # Port row - result.append("Port".ljust(label_w), style="#506878") - result.append(self._port, style="#c8d0d8") - result.append("\n") - - # Firmware row - result.append("FW".ljust(label_w), style="#506878") - result.append(self._firmware, style="#c8d0d8") - result.append("\n") - - # Menu row - result.append("Menu".ljust(label_w), style="#506878") - # Color the menu prompt with its matching prompt color - submenu_colors: dict[str, str] = { - "TRK>": "#00d4aa", - "MOT>": "#00e060", - "DVB>": "#2080d0", - "NVS>": "#e8a020", - "A3981>": "#00b8c8", - "STEP>": "#40c0a0", - "EE>": "#e8a020", - "OS>": "#8090a0", - "ADC>": "#00b8c8", - "GPIO>": "#40c0a0", - "PEAK>": "#e8c020", - } - menu_color = submenu_colors.get(self._submenu, "#c8d0d8") - result.append(self._submenu, style=f"{menu_color} bold") - - return result diff --git a/tui/src/birdcage_tui/widgets/mode_bar.py b/tui/src/birdcage_tui/widgets/mode_bar.py new file mode 100644 index 0000000..995393a --- /dev/null +++ b/tui/src/birdcage_tui/widgets/mode_bar.py @@ -0,0 +1,72 @@ +"""Mode bar widget -- button bar for ContentSwitcher sub-modes.""" + +from textual.app import ComposeResult +from textual.containers import Horizontal +from textual.message import Message +from textual.widgets import Button + + +class ModeBar(Horizontal): + """Horizontal bar of toggle buttons that switch a ContentSwitcher. + + Used inside screens to switch between sub-modes (e.g., Manual/Presets/Track + within the Control screen, or Monitor/Sweep/SkyMap within Signal). + + Posts a ``ModeBar.ModeChanged`` message when the active mode changes. + The parent screen should watch for this and update its ContentSwitcher. + """ + + class ModeChanged(Message): + """Posted when the user selects a different mode.""" + + def __init__(self, mode: str) -> None: + super().__init__() + self.mode = mode + + def __init__( + self, + modes: dict[str, str], + initial: str | None = None, + **kwargs, + ) -> None: + """Create a mode bar. + + Args: + modes: Mapping of mode_key -> display label. + initial: Which mode to highlight initially. Defaults to the first key. + """ + super().__init__(**kwargs) + self._modes = modes + self._initial = initial or next(iter(modes)) + + def compose(self) -> ComposeResult: + for key, label in self._modes.items(): + btn = Button(label, id=f"mode-{key}", classes="mode-btn") + if key == self._initial: + btn.add_class("active") + yield btn + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + if not button_id.startswith("mode-"): + return + + mode = button_id.removeprefix("mode-") + if mode not in self._modes: + return + + # Update button highlight + for btn in self.query(".mode-btn"): + btn.remove_class("active") + event.button.add_class("active") + + self.post_message(self.ModeChanged(mode)) + event.stop() + + @property + def active_mode(self) -> str: + """Return the currently active mode key.""" + for btn in self.query(".mode-btn.active"): + btn_id = btn.id or "" + return btn_id.removeprefix("mode-") + return self._initial diff --git a/tui/src/birdcage_tui/widgets/motor_tuning.py b/tui/src/birdcage_tui/widgets/motor_tuning.py new file mode 100644 index 0000000..db77577 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/motor_tuning.py @@ -0,0 +1,130 @@ +"""Motor tuning widget -- PID gain editor for AZ and EL motor control loops.""" + +from textual.app import ComposeResult +from textual.containers import Container, Horizontal, Vertical +from textual.message import Message +from textual.widgets import Button, Input, Static + + +class MotorTuning(Container): + """PID gain editor for AZ and EL motor control loops. + + Displays two rows of labeled inputs (Kp, Kv, Ki for each axis) and an + Apply button. The button uses ``variant="warning"`` because writing PID + gains takes effect immediately on the live motor control loop. + + The parent screen should confirm the action before sending the values + to the firmware via ``mot pid ``. + """ + + class ApplyRequested(Message): + """Posted when the user clicks Apply PID. + + The parent screen should validate and confirm before writing to + the firmware, since PID changes affect motor behavior immediately. + """ + + def __init__( + self, + az_kp: float, + az_kv: float, + az_ki: float, + el_kp: float, + el_kv: float, + el_ki: float, + ) -> None: + super().__init__() + self.az_kp = az_kp + self.az_kv = az_kv + self.az_ki = az_ki + self.el_kp = el_kp + self.el_kv = el_kv + self.el_ki = el_ki + + def compose(self) -> ComposeResult: + yield Static("PID Tuning", classes="panel-title") + + with Vertical(): + # AZ row + with Horizontal(classes="pid-row"): + yield Static("AZ", classes="pid-axis-label") + yield Static("Kp:", classes="pid-gain-label") + yield Input(value="600", id="pid-az-kp", type="number") + yield Static("Kv:", classes="pid-gain-label") + yield Input(value="60", id="pid-az-kv", type="number") + yield Static("Ki:", classes="pid-gain-label") + yield Input(value="1", id="pid-az-ki", type="number") + + # EL row + with Horizontal(classes="pid-row"): + yield Static("EL", classes="pid-axis-label") + yield Static("Kp:", classes="pid-gain-label") + yield Input(value="250", id="pid-el-kp", type="number") + yield Static("Kv:", classes="pid-gain-label") + yield Input(value="50", id="pid-el-kv", type="number") + yield Static("Ki:", classes="pid-gain-label") + yield Input(value="1", id="pid-el-ki", type="number") + + # Button row + with Horizontal(classes="pid-button-row"): + yield Button( + "Apply PID", + id="pid-apply", + variant="warning", + ) + + def load_gains( + self, + az_kp: float, + az_kv: float, + az_ki: float, + el_kp: float, + el_kv: float, + el_ki: float, + ) -> None: + """Populate all input fields from device-reported PID gains. + + Args: + az_kp: Azimuth proportional gain. + az_kv: Azimuth velocity gain. + az_ki: Azimuth integral gain. + el_kp: Elevation proportional gain. + el_kv: Elevation velocity gain. + el_ki: Elevation integral gain. + """ + self.query_one("#pid-az-kp", Input).value = str(int(az_kp)) + self.query_one("#pid-az-kv", Input).value = str(int(az_kv)) + self.query_one("#pid-az-ki", Input).value = str(int(az_ki)) + self.query_one("#pid-el-kp", Input).value = str(int(el_kp)) + self.query_one("#pid-el-kv", Input).value = str(int(el_kv)) + self.query_one("#pid-el-ki", Input).value = str(int(el_ki)) + + def on_button_pressed(self, event: Button.Pressed) -> None: + """Handle the Apply PID button press.""" + if event.button.id != "pid-apply": + return + + try: + az_kp = float(self.query_one("#pid-az-kp", Input).value) + az_kv = float(self.query_one("#pid-az-kv", Input).value) + az_ki = float(self.query_one("#pid-az-ki", Input).value) + el_kp = float(self.query_one("#pid-el-kp", Input).value) + el_kv = float(self.query_one("#pid-el-kv", Input).value) + el_ki = float(self.query_one("#pid-el-ki", Input).value) + except ValueError: + # Non-numeric input -- do not post the message. + # The Input widget's type="number" constraint should prevent this + # in normal usage, but guard against edge cases. + return + + self.post_message( + self.ApplyRequested( + az_kp=az_kp, + az_kv=az_kv, + az_ki=az_ki, + el_kp=el_kp, + el_kv=el_kv, + el_ki=el_ki, + ) + ) + event.stop() diff --git a/tui/src/birdcage_tui/widgets/nvs_filter.py b/tui/src/birdcage_tui/widgets/nvs_filter.py new file mode 100644 index 0000000..66d3721 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/nvs_filter.py @@ -0,0 +1,72 @@ +"""NVS filter widget -- text search and modified-only toggle for NVS table filtering.""" + +from textual.app import ComposeResult +from textual.containers import Horizontal +from textual.message import Message +from textual.widgets import Checkbox, Input, Static + + +class NvsFilter(Horizontal): + """Filter bar for the NVS table: text search + show-modified-only toggle. + + Composes a label, text input for searching NVS entries by name or index, + and a checkbox to restrict display to entries where current != default. + + Posts ``NvsFilter.FilterChanged`` when either control changes, so the + parent screen can re-filter the NVS DataTable rows. + """ + + class FilterChanged(Message): + """Posted when the filter text or modified-only toggle changes.""" + + def __init__(self, text: str, modified_only: bool) -> None: + super().__init__() + self.text = text + self.modified_only = modified_only + + def compose(self) -> ComposeResult: + yield Static("Filter: ", classes="label") + yield Input( + placeholder="search by name or index...", + id="nvs-filter-input", + ) + yield Checkbox( + "Modified only", + id="nvs-filter-modified", + value=False, + ) + + def on_input_changed(self, event: Input.Changed) -> None: + """Re-post filter state when search text changes.""" + if event.input.id != "nvs-filter-input": + return + self._post_filter() + event.stop() + + def on_checkbox_changed(self, event: Checkbox.Changed) -> None: + """Re-post filter state when the modified-only toggle changes.""" + if event.checkbox.id != "nvs-filter-modified": + return + self._post_filter() + event.stop() + + def _post_filter(self) -> None: + """Read current control values and post a FilterChanged message.""" + text_input = self.query_one("#nvs-filter-input", Input) + checkbox = self.query_one("#nvs-filter-modified", Checkbox) + self.post_message( + self.FilterChanged( + text=text_input.value, + modified_only=checkbox.value, + ) + ) + + @property + def filter_text(self) -> str: + """Current text in the search input.""" + return self.query_one("#nvs-filter-input", Input).value + + @property + def modified_only(self) -> bool: + """Whether the modified-only checkbox is checked.""" + return self.query_one("#nvs-filter-modified", Checkbox).value diff --git a/tui/src/birdcage_tui/widgets/preset_list.py b/tui/src/birdcage_tui/widgets/preset_list.py new file mode 100644 index 0000000..0c6193b --- /dev/null +++ b/tui/src/birdcage_tui/widgets/preset_list.py @@ -0,0 +1,216 @@ +"""Preset list widget -- saved AZ/EL target presets backed by JSON file.""" + +import json +import logging +from pathlib import Path + +from textual.app import ComposeResult +from textual.containers import Container, Horizontal +from textual.message import Message +from textual.widgets import Button, DataTable, Input + +log = logging.getLogger(__name__) + +PRESETS_PATH = Path.home() / ".config" / "birdcage" / "presets.json" + + +class PresetList(Container): + """DataTable of saved AZ/EL presets with save/go/delete actions. + + File format:: + + { + "targets": [ + {"name": "Zenith", "az": null, "el": 65.0, "notes": "EL-only"}, + {"name": "South", "az": 180.0, "el": 45.0, "notes": ""} + ] + } + + An ``az`` value of ``null`` means "don't move AZ" (EL-only targets). + """ + + class GoToPreset(Message): + """Posted when the user presses Go on a selected preset.""" + + def __init__(self, az: float | None, el: float) -> None: + super().__init__() + self.az = az + self.el = el + + class SaveRequested(Message): + """Posted when the user presses Save Current. + + The parent screen should read the current position and call + ``save_preset()`` with a name and the current AZ/EL. + """ + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._presets: list[dict] = [] + self._table_ready = False + + def compose(self) -> ComposeResult: + yield DataTable(id="preset-table") + with Horizontal(classes="preset-controls"): + yield Input(placeholder="Preset name", id="preset-name-input") + yield Button("Save Current", id="btn-preset-save") + yield Button("Go", id="btn-preset-go") + yield Button("Delete", id="btn-preset-delete") + + def on_mount(self) -> None: + """Add columns and load presets when mounted.""" + table = self.query_one("#preset-table", DataTable) + table.add_columns("Name", "AZ", "EL", "Notes") + table.cursor_type = "row" + self._table_ready = True + self.load_presets() + + # ------------------------------------------------------------------ + # Persistence + # ------------------------------------------------------------------ + + def load_presets(self) -> None: + """Read presets from the JSON file and populate the table.""" + self._presets = [] + + if PRESETS_PATH.exists(): + try: + data = json.loads(PRESETS_PATH.read_text(encoding="utf-8")) + self._presets = data.get("targets", []) + except (json.JSONDecodeError, KeyError): + log.warning("Failed to parse presets file: %s", PRESETS_PATH) + + self._rebuild_table() + + def _write_presets(self) -> None: + """Write the current presets list to the JSON file.""" + PRESETS_PATH.parent.mkdir(parents=True, exist_ok=True) + data = {"targets": self._presets} + PRESETS_PATH.write_text( + json.dumps(data, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + + def _rebuild_table(self) -> None: + """Clear and repopulate the DataTable from the in-memory presets list.""" + if not self._table_ready: + return + + table = self.query_one("#preset-table", DataTable) + table.clear() + + for idx, preset in enumerate(self._presets): + name = preset.get("name", f"preset-{idx}") + az = preset.get("az") + el = preset.get("el", 0.0) + notes = preset.get("notes", "") + + az_str = f"{az:.1f}" if az is not None else "---" + el_str = f"{el:.1f}" + + table.add_row(name, az_str, el_str, notes, key=f"preset-{idx}") + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def save_preset( + self, + name: str, + az: float | None, + el: float, + notes: str = "", + ) -> None: + """Append a new preset and persist to disk. + + Args: + name: Human-readable label for this target. + az: Azimuth in degrees, or None for EL-only targets. + el: Elevation in degrees. + notes: Optional description. + """ + entry: dict = { + "name": name, + "az": az, + "el": el, + "notes": notes, + } + self._presets.append(entry) + self._write_presets() + self._rebuild_table() + + def delete_selected(self) -> None: + """Remove the currently highlighted preset row.""" + table = self.query_one("#preset-table", DataTable) + if table.row_count == 0: + return + + row_key = table.cursor_row + if row_key < 0 or row_key >= len(self._presets): + return + + self._presets.pop(row_key) + self._write_presets() + self._rebuild_table() + + def _get_selected_preset(self) -> dict | None: + """Return the preset dict for the currently highlighted row.""" + table = self.query_one("#preset-table", DataTable) + if table.row_count == 0: + return None + + row_key = table.cursor_row + if row_key < 0 or row_key >= len(self._presets): + return None + + return self._presets[row_key] + + @property + def presets(self) -> list[dict]: + """Access the current in-memory presets list.""" + return list(self._presets) + + # ------------------------------------------------------------------ + # Button handlers + # ------------------------------------------------------------------ + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + + if button_id == "btn-preset-save": + self._handle_save() + elif button_id == "btn-preset-go": + self._handle_go() + elif button_id == "btn-preset-delete": + self._handle_delete() + + def _handle_save(self) -> None: + """Read the name input and post a SaveRequested message.""" + name_input = self.query_one("#preset-name-input", Input) + name = name_input.value.strip() + if not name: + self.app.notify("Enter a preset name first", severity="warning") + return + self.post_message(self.SaveRequested()) + + def _handle_go(self) -> None: + """Post a GoToPreset message for the selected row.""" + preset = self._get_selected_preset() + if preset is None: + self.app.notify("No preset selected", severity="warning") + return + + az = preset.get("az") + el = preset.get("el", 0.0) + self.post_message(self.GoToPreset(az=az, el=float(el))) + + def _handle_delete(self) -> None: + """Delete the selected preset.""" + preset = self._get_selected_preset() + if preset is None: + self.app.notify("No preset selected", severity="warning") + return + + name = preset.get("name", "?") + self.delete_selected() + self.app.notify(f"Deleted preset: {name}") diff --git a/tui/src/birdcage_tui/widgets/quick_actions.py b/tui/src/birdcage_tui/widgets/quick_actions.py new file mode 100644 index 0000000..a9186f0 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/quick_actions.py @@ -0,0 +1,48 @@ +"""Quick actions widget -- grid of task-oriented action buttons for the Dashboard.""" + +from textual.app import ComposeResult +from textual.containers import Container, Horizontal +from textual.message import Message +from textual.widgets import Button + + +class QuickActions(Container): + """Grid of buttons navigating to relevant screens/modes. + + Posts an ``ActionSelected`` message when a button is pressed. + The parent screen or app handles the actual navigation or confirmation + (e.g., stow requires user confirmation before moving). + """ + + class ActionSelected(Message): + """Posted when the user selects a quick action.""" + + def __init__(self, action: str) -> None: + super().__init__() + self.action = action + + # Action definitions: (id_suffix, label, description) + _ACTIONS: list[tuple[str, str]] = [ + ("point", "Point Dish"), + ("monitor", "Monitor Signal"), + ("scan", "Scan Sky"), + ("stow", "Stow"), + ] + + def compose(self) -> ComposeResult: + with Horizontal(classes="quick-action-row"): + for action_id, label in self._ACTIONS: + yield Button( + label, + id=f"qa-{action_id}", + classes="quick-action-btn", + ) + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + if not button_id.startswith("qa-"): + return + + action = button_id.removeprefix("qa-") + self.post_message(self.ActionSelected(action)) + event.stop() diff --git a/tui/src/birdcage_tui/widgets/receiver_info.py b/tui/src/birdcage_tui/widgets/receiver_info.py new file mode 100644 index 0000000..3d86151 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/receiver_info.py @@ -0,0 +1,156 @@ +"""Receiver info widget -- parsed DVB/RF receiver parameters display.""" + +import re + +from rich.text import Text +from textual.widgets import Static + + +class ReceiverInfo(Static): + """Displays parsed DVB receiver parameters from bridge channel/config queries. + + Call ``load_data(channel_params_text, dvb_config_text)`` to update. + Parses the firmware's ``dis`` and ``config`` command output into a + compact, color-coded summary of receiver state. + """ + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._freq: str = "---" + self._symrate: str = "---" + self._lnb: str = "---" + self._lock: str = "NO" + self._bcm_id: str = "---" + self._bcm_rev: str = "" + self._bcm_fw: str = "" + + def load_data( + self, + channel_params: str = "", + dvb_config: str = "", + ) -> None: + """Parse raw firmware responses and refresh the display. + + Args: + channel_params: Raw output from DVB ``dis`` command (channel + parameter table with Parameter/Current columns). + dvb_config: Raw output from DVB ``config`` command (BCM + hardware/firmware identification). + """ + self._parse_channel_params(channel_params) + self._parse_dvb_config(dvb_config) + self.refresh() + + def _parse_channel_params(self, text: str) -> None: + """Extract frequency, symbol rate, LNB state, and lock from dis output. + + The ``dis`` command returns a table-formatted output with + "Parameter" and "Current" columns. We extract key-value pairs + by matching known parameter names. + """ + if not text: + return + + # Frequency (kHz) + freq_match = re.search( + r"(?:freq|frequency)\s*[:\|]?\s*(\d+)", text, re.IGNORECASE + ) + if freq_match: + self._freq = f"{freq_match.group(1)} kHz" + + # Symbol rate + sym_match = re.search( + r"(?:sym(?:bol)?[\s_]*rate|ksps)\s*[:\|]?\s*(\S+)", text, re.IGNORECASE + ) + if sym_match: + val = sym_match.group(1) + # May be "blind" for blind scan mode, or a numeric value + if val.lower() in ("blind", "blind_scan", "auto"): + self._symrate = "blind scan" + else: + self._symrate = f"{val} ksps" + + # LNB voltage / polarity + lnb_match = re.search( + r"(?:lnb|polarity|lnbdc)\s*[:\|]?\s*(.+?)(?:\r?\n|$)", text, re.IGNORECASE + ) + if lnb_match: + raw = lnb_match.group(1).strip() + # Interpret voltage as polarity + if "13" in raw: + self._lnb = "ODU 13V (V-pol)" + elif "18" in raw: + self._lnb = "ODU 18V (H-pol)" + elif raw: + self._lnb = raw + + # Lock status + lock_match = re.search(r"lock\s*[:\|]?\s*(\S+)", text, re.IGNORECASE) + if lock_match: + val = lock_match.group(1).upper() + if val in ("1", "YES", "TRUE", "LOCKED"): + self._lock = "YES" + else: + self._lock = "NO" + + def _parse_dvb_config(self, text: str) -> None: + """Extract BCM chip ID, revision, and firmware version from config output. + + The ``config`` command returns lines like: + BCM4515 ID 0x4515 Rev B0 + FW v113.37 + """ + if not text: + return + + # BCM chip ID (e.g., "0x4515") + id_match = re.search(r"(?:ID|BCM)\s*(0x[0-9a-fA-F]+)", text) + if id_match: + self._bcm_id = id_match.group(1) + + # Revision (e.g., "Rev B0") + rev_match = re.search(r"Rev\s+(\S+)", text, re.IGNORECASE) + if rev_match: + self._bcm_rev = rev_match.group(1) + + # Firmware version (e.g., "FW v113.37" or "v113.37") + fw_match = re.search(r"(?:FW\s+)?v(\d+\.\d+)", text) + if fw_match: + self._bcm_fw = f"v{fw_match.group(1)}" + + def render(self) -> Text: + result = Text() + label_w = 10 + + # Frequency + result.append("Freq".ljust(label_w), style="#506878") + result.append(self._freq, style="#c8d0d8") + result.append("\n") + + # Symbol rate + result.append("SymRate".ljust(label_w), style="#506878") + result.append(self._symrate, style="#c8d0d8") + result.append("\n") + + # LNB state + result.append("LNB".ljust(label_w), style="#506878") + result.append(self._lnb, style="#c8d0d8") + result.append("\n") + + # Lock status + result.append("Lock".ljust(label_w), style="#506878") + if self._lock == "YES": + result.append("YES", style="#00e060 bold") + else: + result.append("NO", style="#e04040") + result.append("\n") + + # BCM identification + result.append("BCM".ljust(label_w), style="#506878") + result.append(self._bcm_id, style="#00d4aa") + if self._bcm_rev: + result.append(f" Rev {self._bcm_rev}", style="#c8d0d8") + if self._bcm_fw: + result.append(f" FW {self._bcm_fw}", style="#c8d0d8") + + return result diff --git a/tui/src/birdcage_tui/widgets/status_strip.py b/tui/src/birdcage_tui/widgets/status_strip.py new file mode 100644 index 0000000..ebcd64f --- /dev/null +++ b/tui/src/birdcage_tui/widgets/status_strip.py @@ -0,0 +1,111 @@ +"""Status strip -- persistent 1-row connection/position/signal bar.""" + +from rich.text import Text +from textual.reactive import reactive +from textual.widgets import Static + +# Firmware prompt → display color mapping. +_MENU_COLORS: dict[str, str] = { + "TRK>": "#00d4aa", + "MOT>": "#00e060", + "DVB>": "#2080d0", + "NVS>": "#e8a020", + "A3981>": "#00b8c8", + "STEP>": "#40c0a0", + "EE>": "#e8a020", + "OS>": "#8090a0", + "ADC>": "#00b8c8", + "GPIO>": "#40c0a0", + "PEAK>": "#e8c020", +} + + +class StatusStrip(Static): + """Persistent status bar showing connection, position, signal, and motor state. + + Docked below the header on every screen. Updated by the app-level + position poll and signal monitors. + """ + + connected: reactive[bool] = reactive(False) + demo: reactive[bool] = reactive(False) + port: reactive[str] = reactive("/dev/ttyUSB0") + azimuth: reactive[float] = reactive(0.0) + elevation: reactive[float] = reactive(0.0) + rssi: reactive[int] = reactive(-1) # -1 means no data + motor_state: reactive[str] = reactive("IDLE") + fw_menu: reactive[str] = reactive("TRK>") + + def render(self) -> Text: + result = Text() + sep = Text(" \u2502 ", style="#1a2a38") + + # Connection status + if self.demo: + result.append(" DEMO", style="#e8a020 bold italic") + elif self.connected: + result.append(" CONNECTED", style="#00e060 bold") + result.append(f" {self.port}", style="#506878") + else: + result.append(" OFFLINE", style="#e04040 bold") + + result.append_text(sep) + + # Position + result.append("AZ ", style="#506878") + result.append(f"{self.azimuth:7.2f}", style="#00d4aa bold") + result.append(" EL ", style="#506878") + result.append(f"{self.elevation:6.2f}", style="#00d4aa bold") + + result.append_text(sep) + + # Signal (RSSI) + if self.rssi >= 0: + result.append("RSSI ", style="#506878") + result.append(f"{self.rssi}", style="#00b8c8 bold") + else: + result.append("RSSI ", style="#384858") + result.append("---", style="#384858") + + result.append_text(sep) + + # Motor state + state = self.motor_state + if state == "MOVING": + result.append(state, style="#e8c020 bold") + elif state == "ENGAGED": + result.append(state, style="#00e060") + else: + result.append(state, style="#506878") + + result.append_text(sep) + + # Firmware context + menu_color = _MENU_COLORS.get(self.fw_menu, "#506878") + result.append(self.fw_menu, style=f"{menu_color}") + + return result + + def watch_connected(self, _value: bool) -> None: + self.refresh() + + def watch_demo(self, _value: bool) -> None: + self.refresh() + + def watch_port(self, _value: str) -> None: + self.refresh() + + def watch_azimuth(self, _value: float) -> None: + self.refresh() + + def watch_elevation(self, _value: float) -> None: + self.refresh() + + def watch_rssi(self, _value: int) -> None: + self.refresh() + + def watch_motor_state(self, _value: str) -> None: + self.refresh() + + def watch_fw_menu(self, _value: str) -> None: + self.refresh() diff --git a/tui/src/birdcage_tui/widgets/sweep_plot.py b/tui/src/birdcage_tui/widgets/sweep_plot.py new file mode 100644 index 0000000..a64bf5f --- /dev/null +++ b/tui/src/birdcage_tui/widgets/sweep_plot.py @@ -0,0 +1,157 @@ +"""Sweep plot widget -- 1D azimuth-vs-RSSI bar chart for signal sweep visualization.""" + +from collections import deque + +from rich.text import Text +from textual.widgets import Static + +# 8-level vertical block characters for bar rendering. +# Index 0 = lowest bar, index 7 = tallest bar. +_BLOCKS = "\u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588" + +# RSSI color thresholds matching signal_gauge.py / sky_heatmap.py +_THRESHOLDS: list[tuple[float, str]] = [ + (500.0, "#2080d0"), # cold -- noise floor + (1000.0, "#00b8c8"), # cool -- weak signal + (2000.0, "#00e060"), # mid -- usable + (3000.0, "#e8c020"), # warm -- strong + (4096.0, "#e04040"), # hot -- saturating +] + +# Maximum display width in columns. AZ range is mapped to fit within this. +MAX_DISPLAY_WIDTH = 60 + + +def _rssi_color(rssi: float) -> str: + """Return the color string for a given RSSI value.""" + if rssi <= 0: + return "#1a2a38" + for threshold, color in _THRESHOLDS: + if rssi <= threshold: + return color + return _THRESHOLDS[-1][1] + + +class SweepPlot(Static): + """1D vertical bar chart showing RSSI at each AZ position. + + X-axis = azimuth positions, Y-axis = RSSI intensity (8-level Unicode blocks). + Similar to a spectrum analyzer display but in angular domain. + + Each column represents one azimuth measurement point, rendered as a stacked + block character whose height encodes RSSI strength and whose color encodes + the signal gradient (blue < cyan < green < yellow < red). + + Methods: + clear: Reset all measurement data. + add_point: Add an AZ/RSSI measurement point. + set_active: Highlight the current sweep position. + """ + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + # Ordered measurement data: (az, rssi) pairs + self._points: deque[tuple[float, float]] = deque(maxlen=MAX_DISPLAY_WIDTH) + self._active_az: float | None = None + + def clear(self) -> None: + """Reset all data and clear the active position.""" + self._points.clear() + self._active_az = None + self.refresh() + + def add_point(self, az: float, rssi: float) -> None: + """Add a measurement point and refresh the display. + + Args: + az: Azimuth angle in degrees. + rssi: Raw RSSI ADC count (0-4096). + """ + self._points.append((az, rssi)) + self.refresh() + + def set_active(self, az: float) -> None: + """Highlight the current sweep position and refresh. + + Args: + az: Azimuth angle in degrees of the active scan position. + """ + self._active_az = az + self.refresh() + + def render(self) -> Text: + result = Text() + + # Title line + result.append("AZ Sweep", style="#506878 bold") + + if not self._points: + result.append("\n") + result.append(_BLOCKS[0] * MAX_DISPLAY_WIDTH, style="#1a2a38") + result.append("\n") + result.append("no data", style="#384858") + return result + + points = list(self._points) + rssi_values = [rssi for _, rssi in points] + az_values = [az for az, _ in points] + + # Find peak + peak_rssi = max(rssi_values) + peak_idx = rssi_values.index(peak_rssi) + peak_az = az_values[peak_idx] + + # Normalize RSSI to 0-7 block index range + lo = min(rssi_values) + hi = max(rssi_values) + span = hi - lo + + # Render the bar chart row + result.append("\n") + for az, rssi in points: + is_active = self._active_az is not None and abs(az - self._active_az) < 0.05 + + if is_active: + # Active position: bright white marker + result.append(_BLOCKS[7], style="#ffffff bold") + else: + # Compute block level from RSSI + if span <= 0: + idx = 3 + else: + normalized = (rssi - lo) / span + idx = min(int(normalized * 7.999), 7) + color = _rssi_color(rssi) + result.append(_BLOCKS[idx], style=color) + + # Pad remaining width with low blocks if fewer points than max width + remaining = MAX_DISPLAY_WIDTH - len(points) + if remaining > 0: + result.append(_BLOCKS[0] * remaining, style="#1a2a38") + + # AZ axis labels + result.append("\n") + if len(az_values) >= 2: + az_lo = az_values[0] + az_hi = az_values[-1] + lo_label = f"{az_lo:.1f}\u00b0" + hi_label = f"{az_hi:.1f}\u00b0" + gap = MAX_DISPLAY_WIDTH - len(lo_label) - len(hi_label) + result.append(lo_label, style="#506878") + if gap > 0: + result.append(" " * gap) + result.append(hi_label, style="#506878") + elif len(az_values) == 1: + result.append(f"{az_values[0]:.1f}\u00b0", style="#506878") + + # Peak indicator line + result.append("\n") + result.append("Peak at ", style="#506878") + result.append(f"AZ={peak_az:.1f}", style="#00d4aa bold") + result.append(" RSSI=", style="#506878") + result.append(f"{peak_rssi:.0f}", style=_rssi_color(peak_rssi)) + + # Point count + result.append(f" ({len(points)} pts)", style="#384858") + + return result diff --git a/tui/src/birdcage_tui/widgets/system_health.py b/tui/src/birdcage_tui/widgets/system_health.py new file mode 100644 index 0000000..6e545c3 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/system_health.py @@ -0,0 +1,195 @@ +"""System health panel widget -- compact multi-line diagnostics for the Dashboard.""" + +import re + +from rich.text import Text +from textual.widgets import Static + + +class SystemHealthPanel(Static): + """Multi-line styled text showing A3981 diag, firmware ID, motor life. + + Populated by calling ``load_data()`` with raw firmware response strings. + Parses and formats hardware diagnostics into a compact, color-coded + summary suitable for the Dashboard overview. + """ + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._diag: str = "" + self._torque: str = "" + self._fw_id: str = "" + self._motor_life: str = "" + self._el_limits: dict[str, float] = {"min": 0.0, "max": 0.0} + + def load_data( + self, + diag: str = "", + torque: str = "", + fw_id: str = "", + motor_life: str = "", + el_limits: dict[str, float] | None = None, + ) -> None: + """Update all health fields from raw firmware responses and refresh. + + Args: + diag: Raw A3981 ``diag`` response (e.g., "AZ DIAG: OK EL DIAG: OK"). + torque: Raw A3981 ``st`` response (e.g., "AZ Torq:LOW EL Torq:LOW"). + fw_id: Raw OS ``id`` response with NVS version, system ID, chip info. + motor_life: Raw MOT ``life`` response with usage statistics. + el_limits: Parsed EL limits dict with "min" and "max" keys (degrees). + """ + self._diag = diag + self._torque = torque + self._fw_id = fw_id + self._motor_life = motor_life + if el_limits is not None: + self._el_limits = el_limits + self.refresh() + + def render(self) -> Text: + result = Text() + label_w = 10 + + # A3981 diagnostic row + result.append("A3981".ljust(label_w), style="#506878") + az_diag, el_diag = _parse_diag(self._diag) + result.append("AZ ", style="#506878") + result.append(az_diag, style=_diag_style(az_diag)) + result.append(" EL ", style="#506878") + result.append(el_diag, style=_diag_style(el_diag)) + result.append("\n") + + # Torque row + result.append("Torque".ljust(label_w), style="#506878") + az_torque, el_torque = _parse_torque(self._torque) + result.append("AZ ", style="#506878") + result.append(az_torque, style=_torque_style(az_torque)) + result.append(" EL ", style="#506878") + result.append(el_torque, style=_torque_style(el_torque)) + result.append("\n") + + # Firmware identification row + fw_ver, mcu, ant_id = _parse_fw_id(self._fw_id) + result.append("FW".ljust(label_w), style="#506878") + result.append(fw_ver, style="#c8d0d8") + if mcu: + result.append(" MCU: ", style="#506878") + result.append(mcu, style="#c8d0d8") + if ant_id: + result.append(" Ant: ", style="#506878") + result.append(ant_id, style="#c8d0d8") + result.append("\n") + + # EL range + motor life row + result.append("EL Range".ljust(label_w), style="#506878") + el_min = self._el_limits.get("min", 0.0) + el_max = self._el_limits.get("max", 0.0) + result.append(f"{el_min:.1f}\u00b0", style="#c8d0d8") + result.append(" \u2013 ", style="#506878") + result.append(f"{el_max:.1f}\u00b0", style="#c8d0d8") + + az_life, el_life = _parse_motor_life(self._motor_life) + if az_life or el_life: + result.append(" Life: ", style="#506878") + result.append(f"AZ {az_life}", style="#c8d0d8") + result.append(f" EL {el_life}", style="#c8d0d8") + + return result + + +def _parse_diag(text: str) -> tuple[str, str]: + """Extract AZ and EL diagnostic status from A3981 diag response.""" + az = "---" + el = "---" + if not text: + return az, el + + az_match = re.search(r"AZ\s+DIAG:\s*(\w+)", text, re.IGNORECASE) + el_match = re.search(r"EL\s+DIAG:\s*(\w+)", text, re.IGNORECASE) + if az_match: + az = az_match.group(1).upper() + if el_match: + el = el_match.group(1).upper() + return az, el + + +def _parse_torque(text: str) -> tuple[str, str]: + """Extract AZ and EL torque levels from A3981 st response.""" + az = "---" + el = "---" + if not text: + return az, el + + az_match = re.search(r"AZ\s+Torq:(\w+)", text, re.IGNORECASE) + el_match = re.search(r"EL\s+Torq:(\w+)", text, re.IGNORECASE) + if az_match: + az = az_match.group(1).upper() + if el_match: + el = el_match.group(1).upper() + return az, el + + +def _parse_fw_id(text: str) -> tuple[str, str, str]: + """Extract firmware version, MCU type, and antenna ID from OS id response. + + The ``id`` command returns multi-line output including NVS version, + System ID, and chip details. We extract the most relevant fields. + """ + fw_ver = "---" + mcu = "" + ant_id = "" + if not text: + return fw_ver, mcu, ant_id + + # Firmware / NVS version (e.g., "02.02.48" or "NVS Ver: 02.02.48") + ver_match = re.search(r"(\d{2}\.\d{2}\.\d{2,3})", text) + if ver_match: + fw_ver = ver_match.group(1) + + # MCU identification (e.g., "K60" or "Kinetis") + if "K60" in text or "Kinetis" in text: + mcu = "K60 96MHz" + + # Antenna ID (e.g., "12-IN G2" or "Ant ID") + ant_match = re.search(r"Ant\s+ID\s*[-:]\s*(.+?)(?:\r?\n|$)", text, re.IGNORECASE) + if ant_match: + ant_id = ant_match.group(1).strip() + + return fw_ver, mcu, ant_id + + +def _parse_motor_life(text: str) -> tuple[str, str]: + """Extract AZ and EL motor life counters from MOT life response.""" + az = "" + el = "" + if not text: + return az, el + + # Motor life output varies by firmware. Look for numeric counters + # associated with motor 0 (AZ) and motor 1 (EL). + az_match = re.search(r"(?:Motor\s*\[?0\]?|AZ)\D+(\d+)", text, re.IGNORECASE) + el_match = re.search(r"(?:Motor\s*\[?1\]?|EL)\D+(\d+)", text, re.IGNORECASE) + if az_match: + az = az_match.group(1) + if el_match: + el = el_match.group(1) + return az, el + + +def _diag_style(value: str) -> str: + """Return Rich style string for a diagnostic status value.""" + if value == "OK": + return "#00e060 bold" + if value == "FAULT": + return "#e04040 bold" + return "#506878" + + +def _torque_style(value: str) -> str: + """Return Rich style string for a torque level value.""" + if value == "HIGH": + return "#e8c020 bold" + if value == "LOW": + return "#c8d0d8" + return "#506878" diff --git a/tui/src/birdcage_tui/widgets/tracking_panel.py b/tui/src/birdcage_tui/widgets/tracking_panel.py new file mode 100644 index 0000000..1f4a208 --- /dev/null +++ b/tui/src/birdcage_tui/widgets/tracking_panel.py @@ -0,0 +1,164 @@ +"""Tracking panel widget -- rotctld server lifecycle control for satellite tracking.""" + +from rich.text import Text +from textual.app import ComposeResult +from textual.containers import Container, Horizontal +from textual.message import Message +from textual.reactive import reactive +from textual.widgets import Button, Input, Static + + +class TrackingPanel(Container): + """Panel wrapping the RotctldServer lifecycle UI for satellite tracking. + + Displays server status, bind address, client info, move statistics, + and leapfrog state. The actual rotctld server is started and managed + by the parent screen -- this widget is purely the control surface. + """ + + class StartRequested(Message): + """Posted when the user presses Start Server.""" + + def __init__(self, host: str, port: int, min_el: float) -> None: + super().__init__() + self.host = host + self.port = port + self.min_el = min_el + + class StopRequested(Message): + """Posted when the user presses Stop.""" + + def compose(self) -> ComposeResult: + yield TrackingStatus(id="tracking-status") + with Horizontal(classes="tracking-controls"): + yield Button("Start Server", id="btn-track-start", variant="primary") + yield Button("Stop", id="btn-track-stop") + yield Static(" Bind: ", classes="label") + yield Input(value="127.0.0.1", id="track-host-input") + yield Static(":", classes="label") + yield Input(value="4533", id="track-port-input", type="integer") + yield Static(" Min EL: ", classes="label") + yield Input(value="18.0", id="track-minel-input", type="number") + + # ------------------------------------------------------------------ + # Status updates (called by parent screen) + # ------------------------------------------------------------------ + + def set_status( + self, + state: str = "STOPPED", + client: str = "", + moves: int = 0, + rate: float = 0.0, + leapfrog: bool = True, + ) -> None: + """Update the tracking status display. + + Args: + state: One of "STOPPED", "LISTENING", "CONNECTED". + client: Client identification string (e.g., "Gpredict"). + moves: Total move commands received. + rate: Move command rate in commands per second. + leapfrog: Whether leapfrog predictive compensation is active. + """ + status = self.query_one("#tracking-status", TrackingStatus) + status.state = state + status.client = client + status.moves = moves + status.rate = rate + status.leapfrog = leapfrog + + # ------------------------------------------------------------------ + # Button handlers + # ------------------------------------------------------------------ + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + + if button_id == "btn-track-start": + self._handle_start() + elif button_id == "btn-track-stop": + self._handle_stop() + + def _handle_start(self) -> None: + """Read bind parameters and post a StartRequested message.""" + host = self.query_one("#track-host-input", Input).value.strip() + if not host: + host = "127.0.0.1" + + try: + port = int(self.query_one("#track-port-input", Input).value) + except ValueError: + self.app.notify("Invalid port number", severity="warning") + return + + try: + min_el = float(self.query_one("#track-minel-input", Input).value) + except ValueError: + min_el = 18.0 + + self.post_message(self.StartRequested(host, port, min_el)) + + def _handle_stop(self) -> None: + """Post a StopRequested message.""" + self.post_message(self.StopRequested()) + + +class TrackingStatus(Static): + """Rich text display of rotctld server state and tracking statistics.""" + + state: reactive[str] = reactive("STOPPED") + client: reactive[str] = reactive("") + moves: reactive[int] = reactive(0) + rate: reactive[float] = reactive(0.0) + leapfrog: reactive[bool] = reactive(True) + + def render(self) -> Text: + result = Text() + label_w = 10 + + # Status row + result.append("Status".ljust(label_w), style="#506878") + if self.state == "CONNECTED": + result.append("CONNECTED", style="#00e060 bold") + elif self.state == "LISTENING": + result.append("LISTENING", style="#e8a020 bold") + else: + result.append("STOPPED", style="#e04040") + result.append("\n") + + # Bind address row (shown as part of status context) + result.append("Client".ljust(label_w), style="#506878") + if self.client: + result.append(self.client, style="#c8d0d8") + else: + result.append("(none)", style="#384858") + result.append("\n") + + # Move statistics row + result.append("Moves".ljust(label_w), style="#506878") + result.append(f"{self.moves}", style="#c8d0d8") + result.append(" Rate: ", style="#506878") + result.append(f"{self.rate:.1f}/s", style="#c8d0d8") + result.append(" Leapfrog: ", style="#506878") + if self.leapfrog: + result.append("ON", style="#00e060 bold") + else: + result.append("OFF", style="#506878") + + return result + + def watch_state(self, _value: str) -> None: + self.refresh() + + def watch_client(self, _value: str) -> None: + self.refresh() + + def watch_moves(self, _value: int) -> None: + self.refresh() + + def watch_rate(self, _value: float) -> None: + self.refresh() + + def watch_leapfrog(self, _value: bool) -> None: + self.refresh() diff --git a/tui/tests/test_sweep_stop.py b/tui/tests/test_sweep_stop.py index 479207a..5bed8a5 100644 --- a/tui/tests/test_sweep_stop.py +++ b/tui/tests/test_sweep_stop.py @@ -134,8 +134,7 @@ async def test_sweep_restart_after_stop(): assert not signal._sweeping, "Sweep still running after polling" assert len(signal._sweep_data) > 0, ( - f"First sweep should have data; " - f"device type={type(signal._device).__name__}" + f"First sweep should have data; device type={type(signal._device).__name__}" ) # Second sweep. diff --git a/tui/uv.lock b/tui/uv.lock index 7e82e23..d6918f4 100644 --- a/tui/uv.lock +++ b/tui/uv.lock @@ -5,7 +5,7 @@ requires-python = ">=3.11" [[package]] name = "birdcage" version = "2026.2.12.1" -source = { directory = "../" } +source = { editable = "../" } dependencies = [ { name = "click" }, { name = "pyserial" }, @@ -26,12 +26,24 @@ dependencies = [ { name = "textual" }, ] +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-asyncio" }, +] + [package.metadata] requires-dist = [ - { name = "birdcage", directory = "../" }, + { name = "birdcage", editable = "../" }, { name = "textual", specifier = ">=1.0.0" }, ] +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=9.0.2" }, + { name = "pytest-asyncio", specifier = ">=1.3.0" }, +] + [[package]] name = "click" version = "8.3.1" @@ -53,6 +65,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "iniconfig" +version = "2.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" }, +] + [[package]] name = "linkify-it-py" version = "2.0.3" @@ -103,6 +124,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, ] +[[package]] +name = "packaging" +version = "26.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/65/ee/299d360cdc32edc7d2cf530f3accf79c4fca01e96ffc950d8a52213bd8e4/packaging-26.0.tar.gz", hash = "sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4", size = 143416, upload-time = "2026-01-21T20:50:39.064Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b7/b9/c538f279a4e237a006a2c98387d081e9eb060d203d8ed34467cc0f0b9b53/packaging-26.0-py3-none-any.whl", hash = "sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529", size = 74366, upload-time = "2026-01-21T20:50:37.788Z" }, +] + [[package]] name = "platformdirs" version = "4.7.0" @@ -112,6 +142,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cb/e3/1eddccb2c39ecfbe09b3add42a04abcc3fa5b468aa4224998ffb8a7e9c8f/platformdirs-4.7.0-py3-none-any.whl", hash = "sha256:1ed8db354e344c5bb6039cd727f096af975194b508e37177719d562b2b540ee6", size = 18983, upload-time = "2026-02-12T22:21:52.237Z" }, ] +[[package]] +name = "pluggy" +version = "1.6.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, +] + [[package]] name = "pygments" version = "2.19.2" @@ -130,6 +169,35 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/07/bc/587a445451b253b285629263eb51c2d8e9bcea4fc97826266d186f96f558/pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0", size = 90585, upload-time = "2020-11-23T03:59:13.41Z" }, ] +[[package]] +name = "pytest" +version = "9.0.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, + { name = "iniconfig" }, + { name = "packaging" }, + { name = "pluggy" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, +] + +[[package]] +name = "pytest-asyncio" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/90/2c/8af215c0f776415f3590cac4f9086ccefd6fd463befeae41cd4d3f193e5a/pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5", size = 50087, upload-time = "2025-11-10T16:07:47.256Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" }, +] + [[package]] name = "rich" version = "14.3.2"