birdcage/tui/tests/test_craft_integration.py
Ryan Malloy 6c1e9da773 Add Craft mode — direct satellite tracking via space.warehack.ing API
New F2 Control sub-mode that searches the Craft orbital catalog (22k+
objects), displays pass predictions, and drives the dish in real-time
using server-computed AZ/EL positions from /api/sky/up.

Tracking loop polls at ~1Hz, filters for the tracked target by
target_type + target_id (str, not int — handles satellites, planets,
stars, comets), and issues motor commands through the existing serial
bridge. Verified end-to-end with Carryout G2 hardware tracking NOAA 17.

New files:
- craft_client.py — stdlib HTTP client (urllib only, no deps)
- widgets/craft_panel.py — search table, pass info, tracking status
- tests/test_craft_mode.py — 5 unit tests with mocked API
- tests/test_craft_integration.py — 3 hardware integration tests
2026-02-16 02:07:42 -07:00

223 lines
7.2 KiB
Python

"""Integration test: Craft mode with real hardware.
Exercises the full Craft -> SerialBridge -> dish code path.
Requires /dev/ttyUSB2 (Carryout G2 via RS-422).
NOT part of the normal test suite -- run explicitly:
uv run pytest tests/test_craft_integration.py -v -s
"""
import asyncio
import os
import pytest
from birdcage_tui.app import BirdcageApp
from birdcage_tui.screens.control import ControlScreen
from birdcage_tui.widgets.craft_panel import CraftPanel, CraftTrackingStatus
SERIAL_PORT = "/dev/ttyUSB2"
def _az_delta(a: float, b: float) -> float:
"""Minimum angular distance accounting for 360 wrap."""
d = abs(a - b) % 360.0
return min(d, 360.0 - d)
pytestmark = pytest.mark.skipif(
not os.path.exists(SERIAL_PORT),
reason=f"{SERIAL_PORT} not available",
)
@pytest.mark.asyncio
async def test_craft_search_with_real_api():
"""Search the live Craft API and verify results populate."""
app = BirdcageApp()
app.demo_mode = True
app.craft_url = "https://space.warehack.ing"
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
control = app.query_one("#control", ControlScreen)
control.switch_mode("craft")
await pilot.pause()
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
panel.post_message(CraftPanel.SearchRequested("ISS"))
await asyncio.sleep(3.0)
from textual.widgets import DataTable
table = app.query_one("#craft-results-table", DataTable)
print(f" Search returned {table.row_count} results")
assert table.row_count > 0, "Craft API returned no results for 'ISS'"
@pytest.mark.asyncio
async def test_direct_motor_command():
"""Verify move_motor works through the bridge.
Moves AZ by +1 degree and back. Isolates serial bridge
independent of Craft.
"""
app = BirdcageApp()
app.demo_mode = False
app.serial_port = SERIAL_PORT
app.firmware_name = "g2"
app.skip_init = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await asyncio.sleep(1.0)
assert app.device is not None
assert app.device.is_connected
pos = app.device.get_position()
start_az = pos["azimuth"]
start_el = pos["elevation"]
print(f" Current: AZ={start_az:.2f} EL={start_el:.2f}")
# Move AZ by +1 degree (stay within wrap range)
target_az = (start_az + 1.0) % 360.0
print(f" Moving AZ to {target_az:.2f}")
app.device.move_motor(0, target_az)
await asyncio.sleep(3.0)
pos2 = app.device.get_position()
print(f" After move: AZ={pos2['azimuth']:.2f} EL={pos2['elevation']:.2f}")
delta = _az_delta(pos2["azimuth"], target_az)
print(f" AZ delta from target: {delta:.2f} degrees")
assert delta < 2.0, f"Dish didn't move close to target (delta={delta})"
# Move back
print(f" Returning to AZ={start_az:.2f}")
app.device.move_motor(0, start_az)
await asyncio.sleep(3.0)
pos3 = app.device.get_position()
print(f" Final: AZ={pos3['azimuth']:.2f} EL={pos3['elevation']:.2f}")
@pytest.mark.asyncio
async def test_craft_tracking_moves_real_dish():
"""Track a target via Craft API and verify the dish gets commands.
Uses the real serial device. Tracks briefly then returns
to the original position.
"""
app = BirdcageApp()
app.demo_mode = False
app.serial_port = SERIAL_PORT
app.firmware_name = "g2"
app.skip_init = True
app.craft_url = "https://space.warehack.ing"
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await asyncio.sleep(1.0)
control = app.query_one("#control", ControlScreen)
control.switch_mode("craft")
await pilot.pause()
assert app.device is not None
assert app.device.is_connected
print(f" Device connected on {SERIAL_PORT}")
pos_before = app.device.get_position()
print(
f" Starting position: "
f"AZ={pos_before['azimuth']:.2f} "
f"EL={pos_before['elevation']:.2f}"
)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
# Search for something likely above horizon
panel.post_message(CraftPanel.SearchRequested("Moon"))
await asyncio.sleep(3.0)
from textual.widgets import DataTable
table = app.query_one("#craft-results-table", DataTable)
print(f" Moon search: {table.row_count} results")
if table.row_count == 0:
panel.post_message(CraftPanel.SearchRequested("Jupiter"))
await asyncio.sleep(3.0)
print(f" Jupiter search: {table.row_count} results")
if table.row_count == 0:
panel.post_message(CraftPanel.SearchRequested("Sun"))
await asyncio.sleep(3.0)
print(f" Sun search: {table.row_count} results")
assert table.row_count > 0, "No search results found"
# Read the first row
first_key = list(table.rows.keys())[0]
row = table.get_row(first_key)
target_name = row[0]
target_type = row[1]
target_id = int(row[2])
print(f" Tracking: {target_name} ({target_type}:{target_id})")
# Start tracking
panel.post_message(
CraftPanel.TrackRequested(
target_type=target_type,
target_id=target_id,
name=target_name,
min_el=18.0,
)
)
# Let the tracking loop run a few cycles
await asyncio.sleep(6.0)
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
print(
f" State: {status.state}, "
f"AZ={status.azimuth:.2f}, EL={status.elevation:.2f}, "
f"moves={status.moves}, error={status.error!r}"
)
assert status.state in ("TRACKING", "WAITING"), (
f"Unexpected state: {status.state}"
)
if status.state == "TRACKING":
assert status.moves >= 1, "No motor commands issued"
pos_after = app.device.get_position()
print(
f" Position after: "
f"AZ={pos_after['azimuth']:.2f} "
f"EL={pos_after['elevation']:.2f}"
)
daz = _az_delta(pos_after["azimuth"], pos_before["azimuth"])
del_ = abs(pos_after["elevation"] - pos_before["elevation"])
print(f" Dish moved: dAZ={daz:.2f} dEL={del_:.2f}")
else:
print(" Target below horizon or min_el -- WAITING is OK")
# Stop tracking
control._stop_craft_tracking()
await pilot.pause()
# Return to starting position
print(
f" Returning to: "
f"AZ={pos_before['azimuth']:.2f} "
f"EL={pos_before['elevation']:.2f}"
)
app.device.move_to(pos_before["azimuth"], pos_before["elevation"])
await asyncio.sleep(2.0)
print(" Integration test complete")