New F2 Control sub-mode that searches the Craft orbital catalog (22k+ objects), displays pass predictions, and drives the dish in real-time using server-computed AZ/EL positions from /api/sky/up. Tracking loop polls at ~1Hz, filters for the tracked target by target_type + target_id (str, not int — handles satellites, planets, stars, comets), and issues motor commands through the existing serial bridge. Verified end-to-end with Carryout G2 hardware tracking NOAA 17. New files: - craft_client.py — stdlib HTTP client (urllib only, no deps) - widgets/craft_panel.py — search table, pass info, tracking status - tests/test_craft_mode.py — 5 unit tests with mocked API - tests/test_craft_integration.py — 3 hardware integration tests
337 lines
11 KiB
Python
337 lines
11 KiB
Python
"""Test Track mode -- rotctld server lifecycle via the F2 Control screen.
|
|
|
|
Verifies Start/Stop button wiring, status display updates, and TCP
|
|
client interaction through the Hamlib rotctld protocol. Uses DemoDevice
|
|
as the backing device.
|
|
|
|
NOTE: pilot.click() on dock:bottom widgets can miss due to headless
|
|
coordinate mapping. We use post_message(Button.Pressed(...)) for reliable
|
|
button activation in CI, and pilot.press() for keyboard interaction.
|
|
"""
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
from textual.widgets import Button
|
|
|
|
from birdcage_tui.app import BirdcageApp
|
|
from birdcage_tui.screens.control import ControlScreen
|
|
from birdcage_tui.widgets.tracking_panel import TrackingPanel, TrackingStatus
|
|
|
|
|
|
def _press_button(btn: Button) -> None:
|
|
"""Programmatically press a Button by posting its Pressed message.
|
|
|
|
Bypasses pilot.click() coordinate issues with docked widgets.
|
|
"""
|
|
btn.post_message(Button.Pressed(btn))
|
|
|
|
|
|
async def _switch_to_track(pilot) -> None:
|
|
"""Navigate to F2 Control > Track sub-mode."""
|
|
await pilot.press("f2")
|
|
await pilot.pause()
|
|
|
|
|
|
async def _wait_for_listening(app, timeout: float = 3.0) -> None:
|
|
"""Poll until the tracking panel shows LISTENING or timeout."""
|
|
deadline = asyncio.get_event_loop().time() + timeout
|
|
while asyncio.get_event_loop().time() < deadline:
|
|
try:
|
|
status = app.query_one("#tracking-status", TrackingStatus)
|
|
if status.state == "LISTENING":
|
|
return
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_creates_listening_server():
|
|
"""Clicking Start Server should create a TCP server on the port."""
|
|
app = BirdcageApp()
|
|
app.demo_mode = True
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
await _switch_to_track(pilot)
|
|
|
|
control = app.query_one("#control", ControlScreen)
|
|
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
|
|
|
|
# Switch to Track sub-mode.
|
|
control.switch_mode("track")
|
|
await pilot.pause()
|
|
|
|
# Post StartRequested with a known port.
|
|
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14533, 18.0))
|
|
await _wait_for_listening(app)
|
|
|
|
status = app.query_one("#tracking-status", TrackingStatus)
|
|
assert status.state == "LISTENING"
|
|
assert control._rotctld_server is not None
|
|
|
|
# Verify the TCP port is open.
|
|
reader, writer = await asyncio.open_connection("127.0.0.1", 14533)
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
# Cleanup.
|
|
control._stop_rotctld()
|
|
await pilot.pause()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_shuts_down_server():
|
|
"""Clicking Stop should shut down the rotctld server."""
|
|
app = BirdcageApp()
|
|
app.demo_mode = True
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
await _switch_to_track(pilot)
|
|
|
|
control = app.query_one("#control", ControlScreen)
|
|
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
|
|
|
|
control.switch_mode("track")
|
|
await pilot.pause()
|
|
|
|
# Start.
|
|
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14534, 18.0))
|
|
await _wait_for_listening(app)
|
|
assert control._rotctld_server is not None
|
|
|
|
# Stop.
|
|
panel.post_message(TrackingPanel.StopRequested())
|
|
await asyncio.sleep(0.5)
|
|
|
|
assert control._rotctld_server is None
|
|
|
|
status = app.query_one("#tracking-status", TrackingStatus)
|
|
assert status.state == "STOPPED"
|
|
|
|
# Port should no longer be accepting.
|
|
with pytest.raises(OSError):
|
|
_r, _w = await asyncio.open_connection("127.0.0.1", 14534)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_display_updates():
|
|
"""Status display should reflect server state transitions."""
|
|
app = BirdcageApp()
|
|
app.demo_mode = True
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
await _switch_to_track(pilot)
|
|
|
|
control = app.query_one("#control", ControlScreen)
|
|
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
|
|
|
|
control.switch_mode("track")
|
|
await pilot.pause()
|
|
|
|
# Initial state.
|
|
status = app.query_one("#tracking-status", TrackingStatus)
|
|
assert status.state == "STOPPED"
|
|
assert status.client == ""
|
|
assert status.moves == 0
|
|
|
|
# Start server.
|
|
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14535, 18.0))
|
|
await _wait_for_listening(app)
|
|
assert status.state == "LISTENING"
|
|
|
|
# Connect a client -- should transition to CONNECTED.
|
|
reader, writer = await asyncio.open_connection("127.0.0.1", 14535)
|
|
# Send a command so the server accept loop processes it.
|
|
writer.write(b"_\n")
|
|
await writer.drain()
|
|
resp = await asyncio.wait_for(reader.readline(), timeout=3.0)
|
|
assert b"Birdcage TUI" in resp
|
|
|
|
# Give the status callback time to fire.
|
|
await asyncio.sleep(0.5)
|
|
assert status.state == "CONNECTED"
|
|
assert status.client != ""
|
|
|
|
# Disconnect -- should return to LISTENING.
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
await asyncio.sleep(1.5)
|
|
assert status.state == "LISTENING"
|
|
|
|
# Cleanup.
|
|
control._stop_rotctld()
|
|
await pilot.pause()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rotctld_get_position():
|
|
"""The 'p' command should return AZ and EL from the demo device."""
|
|
app = BirdcageApp()
|
|
app.demo_mode = True
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
await _switch_to_track(pilot)
|
|
|
|
control = app.query_one("#control", ControlScreen)
|
|
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
|
|
|
|
control.switch_mode("track")
|
|
await pilot.pause()
|
|
|
|
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14536, 18.0))
|
|
await _wait_for_listening(app)
|
|
|
|
reader, writer = await asyncio.open_connection("127.0.0.1", 14536)
|
|
|
|
# Query position.
|
|
writer.write(b"p\n")
|
|
await writer.drain()
|
|
|
|
az_line = await asyncio.wait_for(reader.readline(), timeout=3.0)
|
|
el_line = await asyncio.wait_for(reader.readline(), timeout=3.0)
|
|
|
|
az = float(az_line.decode().strip())
|
|
el = float(el_line.decode().strip())
|
|
|
|
# DemoDevice defaults to AZ=180, EL=45.
|
|
assert 170.0 < az < 190.0
|
|
assert 40.0 < el < 50.0
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
control._stop_rotctld()
|
|
await pilot.pause()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rotctld_set_position():
|
|
"""The 'P' command should move the demo device and increment moves."""
|
|
app = BirdcageApp()
|
|
app.demo_mode = True
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
await _switch_to_track(pilot)
|
|
|
|
control = app.query_one("#control", ControlScreen)
|
|
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
|
|
|
|
control.switch_mode("track")
|
|
await pilot.pause()
|
|
|
|
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14537, 18.0))
|
|
await _wait_for_listening(app)
|
|
|
|
reader, writer = await asyncio.open_connection("127.0.0.1", 14537)
|
|
|
|
# Move to a specific position.
|
|
writer.write(b"P 200.0 55.0\n")
|
|
await writer.drain()
|
|
resp = await asyncio.wait_for(reader.readline(), timeout=3.0)
|
|
assert b"RPRT 0" in resp
|
|
|
|
# The demo device's target should be updated.
|
|
assert app.device._target_az == 200.0
|
|
assert app.device._target_el == 55.0
|
|
|
|
# Send another move.
|
|
writer.write(b"P 210.0 60.0\n")
|
|
await writer.drain()
|
|
resp = await asyncio.wait_for(reader.readline(), timeout=3.0)
|
|
assert b"RPRT 0" in resp
|
|
|
|
# Wait for status callback.
|
|
await asyncio.sleep(0.5)
|
|
|
|
status = app.query_one("#tracking-status", TrackingStatus)
|
|
assert status.moves >= 2
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
control._stop_rotctld()
|
|
await pilot.pause()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rotctld_quit_command():
|
|
"""The 'q' command should close the client connection gracefully."""
|
|
app = BirdcageApp()
|
|
app.demo_mode = True
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
await _switch_to_track(pilot)
|
|
|
|
control = app.query_one("#control", ControlScreen)
|
|
panel = app.query_one("#ctrl-tracking-panel", TrackingPanel)
|
|
|
|
control.switch_mode("track")
|
|
await pilot.pause()
|
|
|
|
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14538, 18.0))
|
|
await _wait_for_listening(app)
|
|
|
|
reader, writer = await asyncio.open_connection("127.0.0.1", 14538)
|
|
|
|
writer.write(b"q\n")
|
|
await writer.drain()
|
|
|
|
# Server should close our connection.
|
|
data = await asyncio.wait_for(reader.read(1024), timeout=3.0)
|
|
assert data == b"" # EOF
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
# Server should still be listening for new connections.
|
|
await asyncio.sleep(0.5)
|
|
status = app.query_one("#tracking-status", TrackingStatus)
|
|
assert status.state == "LISTENING"
|
|
|
|
control._stop_rotctld()
|
|
await pilot.pause()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_button_wiring():
|
|
"""Pressing Start Server button should start the server."""
|
|
app = BirdcageApp()
|
|
app.demo_mode = True
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
await _switch_to_track(pilot)
|
|
|
|
control = app.query_one("#control", ControlScreen)
|
|
control.switch_mode("track")
|
|
await pilot.pause()
|
|
|
|
# Set a unique port in the input.
|
|
from textual.widgets import Input
|
|
|
|
port_input = app.query_one("#track-port-input", Input)
|
|
port_input.value = "14539"
|
|
await pilot.pause()
|
|
|
|
# Press Start Server button.
|
|
start_btn = app.query_one("#btn-track-start", Button)
|
|
_press_button(start_btn)
|
|
await _wait_for_listening(app)
|
|
|
|
assert control._rotctld_server is not None
|
|
status = app.query_one("#tracking-status", TrackingStatus)
|
|
assert status.state == "LISTENING"
|
|
|
|
# Press Stop button.
|
|
stop_btn = app.query_one("#btn-track-stop", Button)
|
|
_press_button(stop_btn)
|
|
await asyncio.sleep(0.5)
|
|
|
|
assert control._rotctld_server is None
|
|
assert status.state == "STOPPED"
|