From a249c982085eca21b4553850ef6a6abeeda6389f Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sun, 15 Feb 2026 15:52:49 -0700 Subject: [PATCH] Implement Track mode (Phase 6) with rotctld server TUI-compatible Hamlib rotctld TCP server that works with the existing SerialBridge/DemoDevice interface. F2 Control's Track tab now starts/stops a real TCP server on the configured port. Status callbacks report connection state, move count, and command rate back to the TrackingPanel via thread-safe call_from_thread. Includes 7 pilot tests exercising the full protocol path through asyncio TCP clients against the DemoDevice. --- tui/scripts/take_screenshots.py | 34 +++ tui/src/birdcage_tui/rotctld_server.py | 227 +++++++++++++++ tui/src/birdcage_tui/screens/control.py | 73 ++++- tui/tests/test_track_mode.py | 364 ++++++++++++++++++++++++ 4 files changed, 689 insertions(+), 9 deletions(-) create mode 100644 tui/scripts/take_screenshots.py create mode 100644 tui/src/birdcage_tui/rotctld_server.py create mode 100644 tui/tests/test_track_mode.py diff --git a/tui/scripts/take_screenshots.py b/tui/scripts/take_screenshots.py new file mode 100644 index 0000000..05b1bf2 --- /dev/null +++ b/tui/scripts/take_screenshots.py @@ -0,0 +1,34 @@ +"""Take screenshots of all TUI screens in demo mode for documentation.""" + +import asyncio + +from birdcage_tui.app import BirdcageApp + +SCREENS = { + "f1": "dashboard", + "f2": "control", + "f3": "signal", + "f4": "system", +} + +OUT_DIR = "/home/rpm/claude/ham/satellite/winegard-travler/site/public/screenshots" + + +async def main(): + for key, name in SCREENS.items(): + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await pilot.press(key) + await pilot.pause() + # Let workers populate data. + await asyncio.sleep(1.5) + + path = f"{OUT_DIR}/tui-{name}.svg" + app.save_screenshot(path) + print(f"Saved {path}") + + +asyncio.run(main()) diff --git a/tui/src/birdcage_tui/rotctld_server.py b/tui/src/birdcage_tui/rotctld_server.py new file mode 100644 index 0000000..4dedc8e --- /dev/null +++ b/tui/src/birdcage_tui/rotctld_server.py @@ -0,0 +1,227 @@ +"""Lightweight rotctld TCP server for the Birdcage TUI. + +Implements the Hamlib rotctld wire protocol (p/P/S/_/q) using the TUI's +device interface (SerialBridge or DemoDevice) instead of BirdcageAntenna. +Runs in a background thread with status callbacks to the TUI event loop. +""" + +import contextlib +import logging +import socket +import threading +import time +from collections.abc import Callable +from dataclasses import dataclass, field + +logger = logging.getLogger(__name__) + +MODEL_NAME = "Birdcage TUI" + + +@dataclass +class TrackingState: + status: str = "STOPPED" + client: str = "" + moves: int = 0 + rate: float = 0.0 + _move_timestamps: list[float] = field( + default_factory=list, repr=False + ) + + def record_move(self) -> None: + self.moves += 1 + now = time.monotonic() + self._move_timestamps.append(now) + # Keep only last 30 seconds of timestamps for rate calc. + cutoff = now - 30.0 + self._move_timestamps = [ + t for t in self._move_timestamps if t > cutoff + ] + elapsed = now - self._move_timestamps[0] + if elapsed > 0: + self.rate = len(self._move_timestamps) / elapsed + else: + self.rate = 0.0 + + +class TuiRotctldServer: + """TCP server speaking Hamlib rotctld, backed by a TUI device. + + Args: + device: SerialBridge or DemoDevice instance. + host: Bind address. + port: Bind port. + on_status: Callback invoked on state changes. Called from the + server thread -- caller is responsible for thread safety + (e.g., app.call_from_thread). + """ + + def __init__( + self, + device: object, + host: str = "127.0.0.1", + port: int = 4533, + on_status: Callable[[TrackingState], None] | None = None, + ) -> None: + self._device = device + self._host = host + self._port = port + self._on_status = on_status + self._server_socket: socket.socket | None = None + self._running = False + self._state = TrackingState() + self._lock = threading.Lock() + + @property + def state(self) -> TrackingState: + return self._state + + def _notify(self) -> None: + if self._on_status: + self._on_status(self._state) + + def serve_forever(self) -> None: + """Listen for connections and handle rotctld commands. + + Blocks until stop() is called. + """ + self._server_socket = socket.socket( + socket.AF_INET, socket.SOCK_STREAM + ) + self._server_socket.setsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 + ) + try: + self._server_socket.bind((self._host, self._port)) + except OSError: + logger.error( + "Failed to bind %s:%d", self._host, self._port + ) + self._state.status = "STOPPED" + self._notify() + return + + self._server_socket.listen(1) + self._server_socket.settimeout(1.0) + self._running = True + + self._state.status = "LISTENING" + self._state.client = "" + self._state.moves = 0 + self._state.rate = 0.0 + self._notify() + + logger.info( + "TUI rotctld listening on %s:%d", + self._host, + self._port, + ) + + while self._running: + try: + conn, addr = self._server_socket.accept() + except TimeoutError: + continue + except OSError: + break + + addr_str = f"{addr[0]}:{addr[1]}" + logger.info("rotctld client connected: %s", addr_str) + self._state.status = "CONNECTED" + self._state.client = addr_str + self._notify() + + try: + self._handle_connection(conn) + except Exception: + logger.exception( + "Error handling rotctld client %s", addr_str + ) + finally: + conn.close() + logger.info( + "rotctld client disconnected: %s", addr_str + ) + if self._running: + self._state.status = "LISTENING" + self._state.client = "" + self._notify() + + self._state.status = "STOPPED" + self._state.client = "" + self._notify() + + def stop(self) -> None: + self._running = False + if self._server_socket: + with contextlib.suppress(OSError): + self._server_socket.close() + self._server_socket = None + + def _handle_connection(self, conn: socket.socket) -> None: + conn.settimeout(1.0) + while self._running: + try: + data = conn.recv(1024) + except TimeoutError: + continue + except OSError: + break + + if not data: + break + + for line in data.decode("utf-8", errors="replace").splitlines(): + line = line.strip() + if not line: + continue + + parts = line.split() + cmd = parts[0] + + if cmd == "p": + self._cmd_get_position(conn) + elif cmd == "P": + self._cmd_set_position(conn, parts) + elif cmd == "S": + conn.sendall(b"RPRT 0\n") + return + elif cmd == "_": + conn.sendall( + f"{MODEL_NAME}\n".encode() + ) + elif cmd == "q": + return + else: + logger.warning( + "Unknown rotctld command: %s", cmd + ) + conn.sendall(b"RPRT -1\n") + + def _cmd_get_position(self, conn: socket.socket) -> None: + try: + pos = self._device.get_position() + az = pos["azimuth"] + el = pos["elevation"] + conn.sendall(f"{az}\n{el}\n".encode()) + except Exception: + logger.exception("Failed to get position for rotctld") + conn.sendall(b"RPRT -1\n") + + def _cmd_set_position( + self, conn: socket.socket, parts: list[str] + ) -> None: + try: + target_az = float(parts[1]) + target_el = float(parts[2]) + self._device.move_motor(0, target_az) + self._device.move_motor(1, target_el) + self._state.record_move() + self._notify() + conn.sendall(b"RPRT 0\n") + except (IndexError, ValueError): + logger.error("Bad P command: %s", parts) + conn.sendall(b"RPRT -1\n") + except Exception: + logger.exception("Failed to move for rotctld") + conn.sendall(b"RPRT -1\n") diff --git a/tui/src/birdcage_tui/screens/control.py b/tui/src/birdcage_tui/screens/control.py index c5a42c9..08751eb 100644 --- a/tui/src/birdcage_tui/screens/control.py +++ b/tui/src/birdcage_tui/screens/control.py @@ -7,6 +7,7 @@ mode manages saved AZ/EL targets. Track mode wraps the rotctld server lifecycle. import contextlib import logging +import threading from textual import work from textual.app import ComposeResult @@ -15,6 +16,7 @@ from textual.containers import Container, Horizontal, Vertical from textual.widgets import Button, ContentSwitcher, Input, Static from textual.worker import Worker +from birdcage_tui.rotctld_server import TrackingState, TuiRotctldServer from birdcage_tui.widgets.compass_rose import CompassRose from birdcage_tui.widgets.mode_bar import ModeBar from birdcage_tui.widgets.motor_status import MotorStatus @@ -52,6 +54,8 @@ class ControlScreen(Container): self._last_az = 180.0 self._last_el = 45.0 self._step_size: float = 1.0 + self._rotctld_server: TuiRotctldServer | None = None + self._rotctld_thread: threading.Thread | None = None # ------------------------------------------------------------------ # Compose @@ -157,8 +161,9 @@ class ControlScreen(Container): self._start_data_poll() def on_unmount(self) -> None: - """Stop polling thread on teardown.""" + """Stop polling thread and rotctld server on teardown.""" self._polling = False + self._stop_rotctld() # ------------------------------------------------------------------ # Mode switching @@ -460,29 +465,79 @@ class ControlScreen(Container): def on_tracking_panel_start_requested( self, event: TrackingPanel.StartRequested ) -> None: - """Handle rotctld start request (placeholder for Phase 6).""" + if self._device is None: + self.app.notify( + "No device connected", severity="warning" + ) + return + + if self._rotctld_server is not None: + self.app.notify( + "Server already running", severity="warning" + ) + return + log.info( "Tracking start requested: %s:%d min_el=%.1f", event.host, event.port, event.min_el, ) - panel = self.query_one("#ctrl-tracking-panel", TrackingPanel) - panel.set_status(state="LISTENING") - self.app.notify( - f"Tracking server listening on {event.host}:{event.port}", - severity="information", + + def status_callback(state: TrackingState) -> None: + self.app.call_from_thread( + self._apply_tracking_state, state + ) + + self._rotctld_server = TuiRotctldServer( + device=self._device, + host=event.host, + port=event.port, + on_status=status_callback, ) + self._rotctld_thread = threading.Thread( + target=self._rotctld_server.serve_forever, + name="rotctld", + daemon=True, + ) + self._rotctld_thread.start() def on_tracking_panel_stop_requested( self, _event: TrackingPanel.StopRequested ) -> None: - """Handle rotctld stop request (placeholder for Phase 6).""" - log.info("Tracking stop requested") + self._stop_rotctld() + + def _stop_rotctld(self) -> None: + if self._rotctld_server is None: + return + + log.info("Stopping rotctld server") + self._rotctld_server.stop() + + if self._rotctld_thread is not None: + self._rotctld_thread.join(timeout=3.0) + self._rotctld_thread = None + + self._rotctld_server = None + panel = self.query_one("#ctrl-tracking-panel", TrackingPanel) panel.set_status(state="STOPPED") self.app.notify("Tracking server stopped") + def _apply_tracking_state(self, state: TrackingState) -> None: + try: + panel = self.query_one( + "#ctrl-tracking-panel", TrackingPanel + ) + panel.set_status( + state=state.status, + client=state.client, + moves=state.moves, + rate=state.rate, + ) + except Exception: + pass + # ------------------------------------------------------------------ # Key binding actions # ------------------------------------------------------------------ diff --git a/tui/tests/test_track_mode.py b/tui/tests/test_track_mode.py new file mode 100644 index 0000000..d40562d --- /dev/null +++ b/tui/tests/test_track_mode.py @@ -0,0 +1,364 @@ +"""Test Track mode -- rotctld server lifecycle via the F2 Control screen. + +Verifies Start/Stop button wiring, status display updates, and TCP +client interaction through the Hamlib rotctld protocol. Uses DemoDevice +as the backing device. + +NOTE: pilot.click() on dock:bottom widgets can miss due to headless +coordinate mapping. We use post_message(Button.Pressed(...)) for reliable +button activation in CI, and pilot.press() for keyboard interaction. +""" + +import asyncio + +import pytest +from textual.widgets import Button + +from birdcage_tui.app import BirdcageApp +from birdcage_tui.screens.control import ControlScreen +from birdcage_tui.widgets.tracking_panel import TrackingPanel, TrackingStatus + + +def _press_button(btn: Button) -> None: + """Programmatically press a Button by posting its Pressed message. + + Bypasses pilot.click() coordinate issues with docked widgets. + """ + btn.post_message(Button.Pressed(btn)) + + +async def _switch_to_track(pilot) -> None: + """Navigate to F2 Control > Track sub-mode.""" + await pilot.press("f2") + await pilot.pause() + + +async def _wait_for_listening(app, timeout: float = 3.0) -> None: + """Poll until the tracking panel shows LISTENING or timeout.""" + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + try: + status = app.query_one("#tracking-status", TrackingStatus) + if status.state == "LISTENING": + return + except Exception: + pass + await asyncio.sleep(0.1) + + +@pytest.mark.asyncio +async def test_start_creates_listening_server(): + """Clicking Start Server should create a TCP server on the port.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await _switch_to_track(pilot) + + control = app.query_one("#control", ControlScreen) + panel = app.query_one("#ctrl-tracking-panel", TrackingPanel) + + # Switch to Track sub-mode. + control.switch_mode("track") + await pilot.pause() + + # Post StartRequested with a known port. + panel.post_message( + TrackingPanel.StartRequested("127.0.0.1", 14533, 18.0) + ) + await _wait_for_listening(app) + + status = app.query_one("#tracking-status", TrackingStatus) + assert status.state == "LISTENING" + assert control._rotctld_server is not None + + # Verify the TCP port is open. + reader, writer = await asyncio.open_connection( + "127.0.0.1", 14533 + ) + writer.close() + await writer.wait_closed() + + # Cleanup. + control._stop_rotctld() + await pilot.pause() + + +@pytest.mark.asyncio +async def test_stop_shuts_down_server(): + """Clicking Stop should shut down the rotctld server.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await _switch_to_track(pilot) + + control = app.query_one("#control", ControlScreen) + panel = app.query_one("#ctrl-tracking-panel", TrackingPanel) + + control.switch_mode("track") + await pilot.pause() + + # Start. + panel.post_message( + TrackingPanel.StartRequested("127.0.0.1", 14534, 18.0) + ) + await _wait_for_listening(app) + assert control._rotctld_server is not None + + # Stop. + panel.post_message(TrackingPanel.StopRequested()) + await asyncio.sleep(0.5) + + assert control._rotctld_server is None + + status = app.query_one("#tracking-status", TrackingStatus) + assert status.state == "STOPPED" + + # Port should no longer be accepting. + with pytest.raises(OSError): + _r, _w = await asyncio.open_connection( + "127.0.0.1", 14534 + ) + + +@pytest.mark.asyncio +async def test_status_display_updates(): + """Status display should reflect server state transitions.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await _switch_to_track(pilot) + + control = app.query_one("#control", ControlScreen) + panel = app.query_one("#ctrl-tracking-panel", TrackingPanel) + + control.switch_mode("track") + await pilot.pause() + + # Initial state. + status = app.query_one("#tracking-status", TrackingStatus) + assert status.state == "STOPPED" + assert status.client == "" + assert status.moves == 0 + + # Start server. + panel.post_message( + TrackingPanel.StartRequested("127.0.0.1", 14535, 18.0) + ) + await _wait_for_listening(app) + assert status.state == "LISTENING" + + # Connect a client -- should transition to CONNECTED. + reader, writer = await asyncio.open_connection( + "127.0.0.1", 14535 + ) + # Send a command so the server accept loop processes it. + writer.write(b"_\n") + await writer.drain() + resp = await asyncio.wait_for(reader.readline(), timeout=3.0) + assert b"Birdcage TUI" in resp + + # Give the status callback time to fire. + await asyncio.sleep(0.5) + assert status.state == "CONNECTED" + assert status.client != "" + + # Disconnect -- should return to LISTENING. + writer.close() + await writer.wait_closed() + await asyncio.sleep(1.5) + assert status.state == "LISTENING" + + # Cleanup. + control._stop_rotctld() + await pilot.pause() + + +@pytest.mark.asyncio +async def test_rotctld_get_position(): + """The 'p' command should return AZ and EL from the demo device.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await _switch_to_track(pilot) + + control = app.query_one("#control", ControlScreen) + panel = app.query_one("#ctrl-tracking-panel", TrackingPanel) + + control.switch_mode("track") + await pilot.pause() + + panel.post_message( + TrackingPanel.StartRequested("127.0.0.1", 14536, 18.0) + ) + await _wait_for_listening(app) + + reader, writer = await asyncio.open_connection( + "127.0.0.1", 14536 + ) + + # Query position. + writer.write(b"p\n") + await writer.drain() + + az_line = await asyncio.wait_for( + reader.readline(), timeout=3.0 + ) + el_line = await asyncio.wait_for( + reader.readline(), timeout=3.0 + ) + + az = float(az_line.decode().strip()) + el = float(el_line.decode().strip()) + + # DemoDevice defaults to AZ=180, EL=45. + assert 170.0 < az < 190.0 + assert 40.0 < el < 50.0 + + writer.close() + await writer.wait_closed() + control._stop_rotctld() + await pilot.pause() + + +@pytest.mark.asyncio +async def test_rotctld_set_position(): + """The 'P' command should move the demo device and increment moves.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await _switch_to_track(pilot) + + control = app.query_one("#control", ControlScreen) + panel = app.query_one("#ctrl-tracking-panel", TrackingPanel) + + control.switch_mode("track") + await pilot.pause() + + panel.post_message( + TrackingPanel.StartRequested("127.0.0.1", 14537, 18.0) + ) + await _wait_for_listening(app) + + reader, writer = await asyncio.open_connection( + "127.0.0.1", 14537 + ) + + # Move to a specific position. + writer.write(b"P 200.0 55.0\n") + await writer.drain() + resp = await asyncio.wait_for(reader.readline(), timeout=3.0) + assert b"RPRT 0" in resp + + # The demo device's target should be updated. + assert app.device._target_az == 200.0 + assert app.device._target_el == 55.0 + + # Send another move. + writer.write(b"P 210.0 60.0\n") + await writer.drain() + resp = await asyncio.wait_for(reader.readline(), timeout=3.0) + assert b"RPRT 0" in resp + + # Wait for status callback. + await asyncio.sleep(0.5) + + status = app.query_one("#tracking-status", TrackingStatus) + assert status.moves >= 2 + + writer.close() + await writer.wait_closed() + control._stop_rotctld() + await pilot.pause() + + +@pytest.mark.asyncio +async def test_rotctld_quit_command(): + """The 'q' command should close the client connection gracefully.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await _switch_to_track(pilot) + + control = app.query_one("#control", ControlScreen) + panel = app.query_one("#ctrl-tracking-panel", TrackingPanel) + + control.switch_mode("track") + await pilot.pause() + + panel.post_message( + TrackingPanel.StartRequested("127.0.0.1", 14538, 18.0) + ) + await _wait_for_listening(app) + + reader, writer = await asyncio.open_connection( + "127.0.0.1", 14538 + ) + + writer.write(b"q\n") + await writer.drain() + + # Server should close our connection. + data = await asyncio.wait_for(reader.read(1024), timeout=3.0) + assert data == b"" # EOF + + writer.close() + await writer.wait_closed() + + # Server should still be listening for new connections. + await asyncio.sleep(0.5) + status = app.query_one("#tracking-status", TrackingStatus) + assert status.state == "LISTENING" + + control._stop_rotctld() + await pilot.pause() + + +@pytest.mark.asyncio +async def test_start_button_wiring(): + """Pressing Start Server button should start the server.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await _switch_to_track(pilot) + + control = app.query_one("#control", ControlScreen) + control.switch_mode("track") + await pilot.pause() + + # Set a unique port in the input. + from textual.widgets import Input + + port_input = app.query_one("#track-port-input", Input) + port_input.value = "14539" + await pilot.pause() + + # Press Start Server button. + start_btn = app.query_one("#btn-track-start", Button) + _press_button(start_btn) + await _wait_for_listening(app) + + assert control._rotctld_server is not None + status = app.query_one("#tracking-status", TrackingStatus) + assert status.state == "LISTENING" + + # Press Stop button. + stop_btn = app.query_one("#btn-track-stop", Button) + _press_button(stop_btn) + await asyncio.sleep(0.5) + + assert control._rotctld_server is None + assert status.state == "STOPPED"