Ryan Malloy 16ca4892b3 Promote bridge, demo, craft_client to core birdcage package
Move bridge.py, demo.py, craft_client.py from tui/src/birdcage_tui/ to
src/birdcage/ so both TUI and MCP server can share the device layer
without a circular dependency on textual.
2026-02-17 16:01:38 -07:00

396 lines
15 KiB
Python

"""Birdcage TUI — main application shell.
Horizontal tab bar (F1-F4) with persistent StatusStrip, ContentSwitcher
for four task-oriented screens, and F5 console overlay. App-level position
polling feeds the StatusStrip regardless of which tab is active.
"""
import argparse
import contextlib
import logging
import threading
from textual import work
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal
from textual.widgets import Button, ContentSwitcher, Footer, Header
from birdcage_tui.screens.camera import CameraOverlay
from birdcage_tui.screens.console import ConsoleOverlay
from birdcage_tui.screens.control import ControlScreen
from birdcage_tui.screens.dashboard import DashboardScreen
from birdcage_tui.screens.signal import SignalScreen
from birdcage_tui.screens.system import SystemScreen
from birdcage_tui.widgets.status_strip import StatusStrip
log = logging.getLogger(__name__)
TABS: dict[str, tuple[str, type]] = {
"dashboard": ("F1 Dashboard", DashboardScreen),
"control": ("F2 Control", ControlScreen),
"signal": ("F3 Signal", SignalScreen),
"system": ("F4 System", SystemScreen),
}
class BirdcageApp(App):
"""Textual application for Winegard satellite dish control."""
TITLE = "Birdcage"
CSS_PATH = "theme.tcss"
BINDINGS = [
Binding("f1", "switch_tab('dashboard')", "Dashboard"),
Binding("f2", "switch_tab('control')", "Control"),
Binding("f3", "switch_tab('signal')", "Signal"),
Binding("f4", "switch_tab('system')", "System"),
Binding("f5", "toggle_console", "Console"),
Binding("f6", "toggle_camera", "Camera"),
Binding("q", "quit", "Quit"),
Binding("d", "toggle_dark", "Dark"),
]
# Set from CLI args before run()
demo_mode: bool = False
serial_port: str = "/dev/ttyUSB0"
firmware_name: str = "g2"
skip_init: bool = False
craft_url: str = "https://space.warehack.ing"
capture_dir: str = "captures"
camera_device: str = "auto"
device: object = None
shutdown_event: threading.Event = threading.Event()
# App-level position cache (updated by background poll).
_current_az: float = 0.0
_current_el: float = 0.0
_prev_az: float = 0.0
_prev_el: float = 0.0
_console_visible: bool = False
_camera_visible: bool = False
_pass_detector: object = None # PassEventDetector, set by CameraOverlay
@property
def SUB_TITLE(self) -> str: # noqa: N802
tag = "a generic AZ/EL positioner that doesn't care about wavelength"
if self.demo_mode:
return f"{tag} · DEMO"
return f"{tag} · {self.serial_port}"
def compose(self) -> ComposeResult:
yield Header()
yield StatusStrip(id="status-strip")
with Horizontal(id="tab-bar"):
for tab_key, (label, _) in TABS.items():
yield Button(label, id=f"tab-{tab_key}", classes="tab-btn")
with ContentSwitcher(id="content-area", initial="dashboard"):
for tab_key, (_, screen_cls) in TABS.items():
yield screen_cls(id=tab_key)
yield Footer()
def on_mount(self) -> None:
# Fresh event per instance — class-level default is shared across instances.
self.shutdown_event = threading.Event()
self.query_one("#tab-dashboard").add_class("active")
self._setup_device()
# ------------------------------------------------------------------
# Device lifecycle
# ------------------------------------------------------------------
def _setup_device(self) -> None:
"""Create device (demo or real) and hand it to each screen."""
if self.demo_mode:
from birdcage.demo import DemoDevice
self.device = DemoDevice()
self.device.connect()
else:
from birdcage.bridge import SerialBridge
from birdcage.protocol import get_protocol
protocol = get_protocol(self.firmware_name)
self.device = SerialBridge(protocol)
self.device.connect(self.serial_port)
if not self.skip_init:
self.run_worker(self._initialize_device, thread=True)
self._distribute_device()
self._setup_craft_client()
self._update_status_strip_connection()
self._install_console()
self._install_camera()
self._start_position_poll()
async def _initialize_device(self) -> None:
"""Run device init in a worker thread (blocks on serial I/O)."""
try:
self.device.initialize()
except Exception:
log.exception("Device initialization failed")
self.notify("Init failed -- check serial connection", severity="error")
def _distribute_device(self) -> None:
"""Pass the device reference to every screen that wants it."""
for tab_key in TABS:
screen = self.query_one(f"#{tab_key}")
if hasattr(screen, "set_device"):
screen.set_device(self.device)
def _setup_craft_client(self) -> None:
"""Create a Craft API client and hand it to the control screen."""
if self.demo_mode:
from birdcage.demo import DemoCraftClient
client = DemoCraftClient()
else:
from birdcage.craft_client import CraftClient
client = CraftClient(base_url=self.craft_url)
try:
control = self.query_one("#control")
if hasattr(control, "set_craft_client"):
control.set_craft_client(client)
except Exception:
log.debug("Could not set craft client on control screen")
def _update_status_strip_connection(self) -> None:
"""Set the status strip's connection info from current device."""
strip = self.query_one("#status-strip", StatusStrip)
is_demo = type(self.device).__name__ == "DemoDevice" if self.device else False
strip.demo = is_demo or self.demo_mode
strip.connected = (
self.device is not None
and getattr(self.device, "is_connected", False)
and not strip.demo
)
strip.port = self.serial_port
def _install_console(self) -> None:
"""Pre-install the console overlay so it persists across open/close."""
self.install_screen(ConsoleOverlay(), name="console-overlay")
def _install_camera(self) -> None:
"""Pre-install the camera overlay so it persists across open/close."""
self.install_screen(CameraOverlay(), name="camera-overlay")
# ------------------------------------------------------------------
# App-level position poll
# ------------------------------------------------------------------
@work(thread=True, exclusive=True, group="app-position-poll")
def _start_position_poll(self) -> None:
"""Poll device at ~2 Hz for position, update StatusStrip globally."""
shutdown = self.shutdown_event
while not shutdown.is_set():
if self.device is None:
shutdown.wait(0.5)
continue
try:
pos = self.device.get_position()
az = pos["azimuth"]
el = pos["elevation"]
self._current_az = az
self._current_el = el
self.call_from_thread(self._update_position, az, el)
except Exception:
log.debug("App position poll failed", exc_info=True)
shutdown.wait(0.5)
def _update_position(self, az: float, el: float) -> None:
"""Push position to StatusStrip and active screen."""
strip = self.query_one("#status-strip", StatusStrip)
strip.azimuth = az
strip.elevation = el
# Detect movement from position delta
delta = abs(az - self._prev_az) + abs(el - self._prev_el)
if delta > 0.05:
strip.motor_state = "MOVING"
else:
strip.motor_state = "IDLE"
self._prev_az = az
self._prev_el = el
# Notify active screen
switcher = self.query_one("#content-area", ContentSwitcher)
if switcher.current:
try:
active = self.query_one(f"#{switcher.current}")
if hasattr(active, "on_position_update"):
active.on_position_update(az, el)
except Exception:
pass
# ------------------------------------------------------------------
# Tab switching
# ------------------------------------------------------------------
def action_switch_tab(self, tab: str) -> None:
"""Switch the content area to *tab* and update tab bar highlight."""
switcher = self.query_one("#content-area", ContentSwitcher)
switcher.current = tab
for btn in self.query(".tab-btn"):
btn.remove_class("active")
self.query_one(f"#tab-{tab}").add_class("active")
screen = self.query_one(f"#{tab}")
if hasattr(screen, "on_show"):
screen.on_show()
# ------------------------------------------------------------------
# Console overlay
# ------------------------------------------------------------------
def action_toggle_console(self) -> None:
"""Push or pop the console overlay."""
if self._console_visible:
# Pop the console -- dismiss triggers the callback
try:
self.pop_screen()
except Exception:
self._console_visible = False
else:
self.push_screen("console-overlay", callback=self._on_console_dismissed)
self._console_visible = True
def _on_console_dismissed(self, _result=None) -> None:
"""Called when the console overlay is dismissed."""
self._console_visible = False
# ------------------------------------------------------------------
# Camera overlay
# ------------------------------------------------------------------
def action_toggle_camera(self) -> None:
"""Push or pop the camera capture overlay."""
if self._camera_visible:
try:
self.pop_screen()
except Exception:
self._camera_visible = False
else:
self.push_screen("camera-overlay", callback=self._on_camera_dismissed)
self._camera_visible = True
def _on_camera_dismissed(self, _result=None) -> None:
"""Called when the camera overlay is dismissed."""
self._camera_visible = False
# ------------------------------------------------------------------
# Tab bar button handling
# ------------------------------------------------------------------
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id or ""
if button_id.startswith("tab-"):
tab = button_id.removeprefix("tab-")
if tab in TABS:
self.action_switch_tab(tab)
# ------------------------------------------------------------------
# QuickActions navigation (from Dashboard)
# ------------------------------------------------------------------
def on_quick_actions_action_selected(self, event) -> None:
"""Handle navigation requests from the Dashboard's quick actions."""
action = event.action
if action == "point":
self.action_switch_tab("control")
elif action == "monitor":
self.action_switch_tab("signal")
elif action == "scan":
self.action_switch_tab("signal")
# Tell the signal screen to switch to skymap mode
try:
signal_screen = self.query_one("#signal")
if hasattr(signal_screen, "switch_mode"):
signal_screen.switch_mode("skymap")
except Exception:
pass
elif action == "stow":
self._do_stow()
def _do_stow(self) -> None:
"""Move dish to stow position (0, 65)."""
if self.device is None:
self.notify("No device connected", severity="warning")
return
self.notify("Stowing dish to AZ=0 EL=65...", severity="information")
self._run_stow()
@work(thread=True)
def _run_stow(self) -> None:
"""Execute stow in a worker thread."""
try:
self.device.move_to(0.0, 65.0)
self.call_from_thread(self.notify, "Stow command sent")
except Exception:
log.exception("Stow failed")
self.call_from_thread(self.notify, "Stow failed", severity="error")
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def on_unmount(self) -> None:
"""Signal all worker threads to exit and disconnect the device."""
self.shutdown_event.set()
if self.device and hasattr(self.device, "disconnect"):
self.device.disconnect()
def main() -> None:
parser = argparse.ArgumentParser(
description="Birdcage TUI -- Satellite Dish Control"
)
parser.add_argument("--demo", action="store_true", help="Run with simulated device")
parser.add_argument("--port", default="/dev/ttyUSB0", help="Serial port")
parser.add_argument(
"--firmware",
default="g2",
choices=["g2", "hal205", "hal000"],
help="Firmware version",
)
parser.add_argument(
"--skip-init", action="store_true", help="Skip firmware initialization"
)
parser.add_argument(
"--craft-url",
default="https://space.warehack.ing",
help="Craft API base URL",
)
parser.add_argument(
"--capture-dir",
default="captures",
help="Output directory for camera captures",
)
parser.add_argument(
"--camera-device",
default="auto",
help="Camera device (e.g., /dev/video0) or 'auto'",
)
args = parser.parse_args()
app = BirdcageApp()
app.demo_mode = args.demo
app.serial_port = args.port
app.firmware_name = args.firmware
app.skip_init = args.skip_init
app.craft_url = args.craft_url
app.capture_dir = args.capture_dir
app.camera_device = args.camera_device
try:
app.run()
except KeyboardInterrupt:
pass
finally:
app.shutdown_event.set()
with contextlib.suppress(Exception):
if app.device and hasattr(app.device, "disconnect"):
app.device.disconnect()