birdcage/tui/tests/test_track_mode.py
Ryan Malloy a249c98208 Implement Track mode (Phase 6) with rotctld server
TUI-compatible Hamlib rotctld TCP server that works with the
existing SerialBridge/DemoDevice interface. F2 Control's Track
tab now starts/stops a real TCP server on the configured port.
Status callbacks report connection state, move count, and command
rate back to the TrackingPanel via thread-safe call_from_thread.

Includes 7 pilot tests exercising the full protocol path through
asyncio TCP clients against the DemoDevice.
2026-02-15 15:52:49 -07:00

365 lines
11 KiB
Python

"""Test Track mode -- rotctld server lifecycle via the F2 Control screen.
Verifies Start/Stop button wiring, status display updates, and TCP
client interaction through the Hamlib rotctld protocol. Uses DemoDevice
as the backing device.
NOTE: pilot.click() on dock:bottom widgets can miss due to headless
coordinate mapping. We use post_message(Button.Pressed(...)) for reliable
button activation in CI, and pilot.press() for keyboard interaction.
"""
import asyncio
import pytest
from textual.widgets import Button
from birdcage_tui.app import BirdcageApp
from birdcage_tui.screens.control import ControlScreen
from birdcage_tui.widgets.tracking_panel import TrackingPanel, TrackingStatus
def _press_button(btn: Button) -> None:
"""Programmatically press a Button by posting its Pressed message.
Bypasses pilot.click() coordinate issues with docked widgets.
"""
btn.post_message(Button.Pressed(btn))
async def _switch_to_track(pilot) -> None:
"""Navigate to F2 Control > Track sub-mode."""
await pilot.press("f2")
await pilot.pause()
async def _wait_for_listening(app, timeout: float = 3.0) -> None:
"""Poll until the tracking panel shows LISTENING or timeout."""
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
try:
status = app.query_one("#tracking-status", TrackingStatus)
if status.state == "LISTENING":
return
except Exception:
pass
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_start_creates_listening_server():
"""Clicking Start Server should create a TCP server on the port."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_track(pilot)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
# Switch to Track sub-mode.
control.switch_mode("track")
await pilot.pause()
# Post StartRequested with a known port.
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14533, 18.0)
)
await _wait_for_listening(app)
status = app.query_one("#tracking-status", TrackingStatus)
assert status.state == "LISTENING"
assert control._rotctld_server is not None
# Verify the TCP port is open.
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14533
)
writer.close()
await writer.wait_closed()
# Cleanup.
control._stop_rotctld()
await pilot.pause()
@pytest.mark.asyncio
async def test_stop_shuts_down_server():
"""Clicking Stop should shut down the rotctld server."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_track(pilot)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
control.switch_mode("track")
await pilot.pause()
# Start.
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14534, 18.0)
)
await _wait_for_listening(app)
assert control._rotctld_server is not None
# Stop.
panel.post_message(TrackingPanel.StopRequested())
await asyncio.sleep(0.5)
assert control._rotctld_server is None
status = app.query_one("#tracking-status", TrackingStatus)
assert status.state == "STOPPED"
# Port should no longer be accepting.
with pytest.raises(OSError):
_r, _w = await asyncio.open_connection(
"127.0.0.1", 14534
)
@pytest.mark.asyncio
async def test_status_display_updates():
"""Status display should reflect server state transitions."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_track(pilot)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
control.switch_mode("track")
await pilot.pause()
# Initial state.
status = app.query_one("#tracking-status", TrackingStatus)
assert status.state == "STOPPED"
assert status.client == ""
assert status.moves == 0
# Start server.
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14535, 18.0)
)
await _wait_for_listening(app)
assert status.state == "LISTENING"
# Connect a client -- should transition to CONNECTED.
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14535
)
# Send a command so the server accept loop processes it.
writer.write(b"_\n")
await writer.drain()
resp = await asyncio.wait_for(reader.readline(), timeout=3.0)
assert b"Birdcage TUI" in resp
# Give the status callback time to fire.
await asyncio.sleep(0.5)
assert status.state == "CONNECTED"
assert status.client != ""
# Disconnect -- should return to LISTENING.
writer.close()
await writer.wait_closed()
await asyncio.sleep(1.5)
assert status.state == "LISTENING"
# Cleanup.
control._stop_rotctld()
await pilot.pause()
@pytest.mark.asyncio
async def test_rotctld_get_position():
"""The 'p' command should return AZ and EL from the demo device."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_track(pilot)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
control.switch_mode("track")
await pilot.pause()
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14536, 18.0)
)
await _wait_for_listening(app)
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14536
)
# Query position.
writer.write(b"p\n")
await writer.drain()
az_line = await asyncio.wait_for(
reader.readline(), timeout=3.0
)
el_line = await asyncio.wait_for(
reader.readline(), timeout=3.0
)
az = float(az_line.decode().strip())
el = float(el_line.decode().strip())
# DemoDevice defaults to AZ=180, EL=45.
assert 170.0 < az < 190.0
assert 40.0 < el < 50.0
writer.close()
await writer.wait_closed()
control._stop_rotctld()
await pilot.pause()
@pytest.mark.asyncio
async def test_rotctld_set_position():
"""The 'P' command should move the demo device and increment moves."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_track(pilot)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
control.switch_mode("track")
await pilot.pause()
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14537, 18.0)
)
await _wait_for_listening(app)
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14537
)
# Move to a specific position.
writer.write(b"P 200.0 55.0\n")
await writer.drain()
resp = await asyncio.wait_for(reader.readline(), timeout=3.0)
assert b"RPRT 0" in resp
# The demo device's target should be updated.
assert app.device._target_az == 200.0
assert app.device._target_el == 55.0
# Send another move.
writer.write(b"P 210.0 60.0\n")
await writer.drain()
resp = await asyncio.wait_for(reader.readline(), timeout=3.0)
assert b"RPRT 0" in resp
# Wait for status callback.
await asyncio.sleep(0.5)
status = app.query_one("#tracking-status", TrackingStatus)
assert status.moves >= 2
writer.close()
await writer.wait_closed()
control._stop_rotctld()
await pilot.pause()
@pytest.mark.asyncio
async def test_rotctld_quit_command():
"""The 'q' command should close the client connection gracefully."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_track(pilot)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
control.switch_mode("track")
await pilot.pause()
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14538, 18.0)
)
await _wait_for_listening(app)
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14538
)
writer.write(b"q\n")
await writer.drain()
# Server should close our connection.
data = await asyncio.wait_for(reader.read(1024), timeout=3.0)
assert data == b"" # EOF
writer.close()
await writer.wait_closed()
# Server should still be listening for new connections.
await asyncio.sleep(0.5)
status = app.query_one("#tracking-status", TrackingStatus)
assert status.state == "LISTENING"
control._stop_rotctld()
await pilot.pause()
@pytest.mark.asyncio
async def test_start_button_wiring():
"""Pressing Start Server button should start the server."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_track(pilot)
control = app.query_one("#control", ControlScreen)
control.switch_mode("track")
await pilot.pause()
# Set a unique port in the input.
from textual.widgets import Input
port_input = app.query_one("#track-port-input", Input)
port_input.value = "14539"
await pilot.pause()
# Press Start Server button.
start_btn = app.query_one("#btn-track-start", Button)
_press_button(start_btn)
await _wait_for_listening(app)
assert control._rotctld_server is not None
status = app.query_one("#tracking-status", TrackingStatus)
assert status.state == "LISTENING"
# Press Stop button.
stop_btn = app.query_one("#btn-track-stop", Button)
_press_button(stop_btn)
await asyncio.sleep(0.5)
assert control._rotctld_server is None
assert status.state == "STOPPED"