"""Birdcage TUI — main application shell. Horizontal tab bar (F1-F4) with persistent StatusStrip, ContentSwitcher for four task-oriented screens, and F5 console overlay. App-level position polling feeds the StatusStrip regardless of which tab is active. """ import argparse import contextlib import logging import threading from textual import work from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Horizontal from textual.widgets import Button, ContentSwitcher, Footer, Header from birdcage_tui.screens.console import ConsoleOverlay from birdcage_tui.screens.control import ControlScreen from birdcage_tui.screens.dashboard import DashboardScreen from birdcage_tui.screens.signal import SignalScreen from birdcage_tui.screens.system import SystemScreen from birdcage_tui.widgets.status_strip import StatusStrip log = logging.getLogger(__name__) TABS: dict[str, tuple[str, type]] = { "dashboard": ("F1 Dashboard", DashboardScreen), "control": ("F2 Control", ControlScreen), "signal": ("F3 Signal", SignalScreen), "system": ("F4 System", SystemScreen), } class BirdcageApp(App): """Textual application for Winegard satellite dish control.""" TITLE = "Birdcage" CSS_PATH = "theme.tcss" BINDINGS = [ Binding("f1", "switch_tab('dashboard')", "Dashboard"), Binding("f2", "switch_tab('control')", "Control"), Binding("f3", "switch_tab('signal')", "Signal"), Binding("f4", "switch_tab('system')", "System"), Binding("f5", "toggle_console", "Console"), Binding("q", "quit", "Quit"), Binding("d", "toggle_dark", "Dark"), ] # Set from CLI args before run() demo_mode: bool = False serial_port: str = "/dev/ttyUSB0" firmware_name: str = "g2" skip_init: bool = False device: object = None shutdown_event: threading.Event = threading.Event() # App-level position cache (updated by background poll). _current_az: float = 0.0 _current_el: float = 0.0 _prev_az: float = 0.0 _prev_el: float = 0.0 _console_visible: bool = False @property def SUB_TITLE(self) -> str: # noqa: N802 if self.demo_mode: return "DEMO" return self.serial_port def compose(self) -> ComposeResult: yield Header() yield StatusStrip(id="status-strip") with Horizontal(id="tab-bar"): for tab_key, (label, _) in TABS.items(): yield Button(label, id=f"tab-{tab_key}", classes="tab-btn") with ContentSwitcher(id="content-area", initial="dashboard"): for tab_key, (_, screen_cls) in TABS.items(): yield screen_cls(id=tab_key) yield Footer() def on_mount(self) -> None: # Fresh event per instance — class-level default is shared across instances. self.shutdown_event = threading.Event() self.query_one("#tab-dashboard").add_class("active") self._setup_device() # ------------------------------------------------------------------ # Device lifecycle # ------------------------------------------------------------------ def _setup_device(self) -> None: """Create device (demo or real) and hand it to each screen.""" if self.demo_mode: from birdcage_tui.demo import DemoDevice self.device = DemoDevice() self.device.connect() else: from birdcage.protocol import get_protocol from birdcage_tui.bridge import SerialBridge protocol = get_protocol(self.firmware_name) self.device = SerialBridge(protocol) self.device.connect(self.serial_port) if not self.skip_init: self.run_worker(self._initialize_device, thread=True) self._distribute_device() self._update_status_strip_connection() self._install_console() self._start_position_poll() async def _initialize_device(self) -> None: """Run device init in a worker thread (blocks on serial I/O).""" try: self.device.initialize() except Exception: log.exception("Device initialization failed") self.notify("Init failed -- check serial connection", severity="error") def _distribute_device(self) -> None: """Pass the device reference to every screen that wants it.""" for tab_key in TABS: screen = self.query_one(f"#{tab_key}") if hasattr(screen, "set_device"): screen.set_device(self.device) def _update_status_strip_connection(self) -> None: """Set the status strip's connection info from current device.""" strip = self.query_one("#status-strip", StatusStrip) is_demo = type(self.device).__name__ == "DemoDevice" if self.device else False strip.demo = is_demo or self.demo_mode strip.connected = ( self.device is not None and getattr(self.device, "is_connected", False) and not strip.demo ) strip.port = self.serial_port def _install_console(self) -> None: """Pre-install the console overlay so it persists across open/close.""" self.install_screen(ConsoleOverlay(), name="console-overlay") # ------------------------------------------------------------------ # App-level position poll # ------------------------------------------------------------------ @work(thread=True, exclusive=True, group="app-position-poll") def _start_position_poll(self) -> None: """Poll device at ~2 Hz for position, update StatusStrip globally.""" shutdown = self.shutdown_event while not shutdown.is_set(): if self.device is None: shutdown.wait(0.5) continue try: pos = self.device.get_position() az = pos["azimuth"] el = pos["elevation"] self._current_az = az self._current_el = el self.call_from_thread(self._update_position, az, el) except Exception: log.debug("App position poll failed", exc_info=True) shutdown.wait(0.5) def _update_position(self, az: float, el: float) -> None: """Push position to StatusStrip and active screen.""" strip = self.query_one("#status-strip", StatusStrip) strip.azimuth = az strip.elevation = el # Detect movement from position delta delta = abs(az - self._prev_az) + abs(el - self._prev_el) if delta > 0.05: strip.motor_state = "MOVING" else: strip.motor_state = "IDLE" self._prev_az = az self._prev_el = el # Notify active screen switcher = self.query_one("#content-area", ContentSwitcher) if switcher.current: try: active = self.query_one(f"#{switcher.current}") if hasattr(active, "on_position_update"): active.on_position_update(az, el) except Exception: pass # ------------------------------------------------------------------ # Tab switching # ------------------------------------------------------------------ def action_switch_tab(self, tab: str) -> None: """Switch the content area to *tab* and update tab bar highlight.""" switcher = self.query_one("#content-area", ContentSwitcher) switcher.current = tab for btn in self.query(".tab-btn"): btn.remove_class("active") self.query_one(f"#tab-{tab}").add_class("active") screen = self.query_one(f"#{tab}") if hasattr(screen, "on_show"): screen.on_show() # ------------------------------------------------------------------ # Console overlay # ------------------------------------------------------------------ def action_toggle_console(self) -> None: """Push or pop the console overlay.""" if self._console_visible: # Pop the console -- dismiss triggers the callback try: self.pop_screen() except Exception: self._console_visible = False else: self.push_screen("console-overlay", callback=self._on_console_dismissed) self._console_visible = True def _on_console_dismissed(self, _result=None) -> None: """Called when the console overlay is dismissed.""" self._console_visible = False # ------------------------------------------------------------------ # Tab bar button handling # ------------------------------------------------------------------ def on_button_pressed(self, event: Button.Pressed) -> None: button_id = event.button.id or "" if button_id.startswith("tab-"): tab = button_id.removeprefix("tab-") if tab in TABS: self.action_switch_tab(tab) # ------------------------------------------------------------------ # QuickActions navigation (from Dashboard) # ------------------------------------------------------------------ def on_quick_actions_action_selected(self, event) -> None: """Handle navigation requests from the Dashboard's quick actions.""" action = event.action if action == "point": self.action_switch_tab("control") elif action == "monitor": self.action_switch_tab("signal") elif action == "scan": self.action_switch_tab("signal") # Tell the signal screen to switch to skymap mode try: signal_screen = self.query_one("#signal") if hasattr(signal_screen, "switch_mode"): signal_screen.switch_mode("skymap") except Exception: pass elif action == "stow": self._do_stow() def _do_stow(self) -> None: """Move dish to stow position (0, 65).""" if self.device is None: self.notify("No device connected", severity="warning") return self.notify("Stowing dish to AZ=0 EL=65...", severity="information") self._run_stow() @work(thread=True) def _run_stow(self) -> None: """Execute stow in a worker thread.""" try: self.device.move_to(0.0, 65.0) self.call_from_thread(self.notify, "Stow command sent") except Exception: log.exception("Stow failed") self.call_from_thread(self.notify, "Stow failed", severity="error") # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ def on_unmount(self) -> None: """Signal all worker threads to exit and disconnect the device.""" self.shutdown_event.set() if self.device and hasattr(self.device, "disconnect"): self.device.disconnect() def main() -> None: parser = argparse.ArgumentParser( description="Birdcage TUI -- Satellite Dish Control" ) parser.add_argument("--demo", action="store_true", help="Run with simulated device") parser.add_argument("--port", default="/dev/ttyUSB0", help="Serial port") parser.add_argument( "--firmware", default="g2", choices=["g2", "hal205", "hal000"], help="Firmware version", ) parser.add_argument( "--skip-init", action="store_true", help="Skip firmware initialization" ) args = parser.parse_args() app = BirdcageApp() app.demo_mode = args.demo app.serial_port = args.port app.firmware_name = args.firmware app.skip_init = args.skip_init try: app.run() except KeyboardInterrupt: pass finally: app.shutdown_event.set() with contextlib.suppress(Exception): if app.device and hasattr(app.device, "disconnect"): app.device.disconnect()