New F2 Control sub-mode that searches the Craft orbital catalog (22k+ objects), displays pass predictions, and drives the dish in real-time using server-computed AZ/EL positions from /api/sky/up. Tracking loop polls at ~1Hz, filters for the tracked target by target_type + target_id (str, not int — handles satellites, planets, stars, comets), and issues motor commands through the existing serial bridge. Verified end-to-end with Carryout G2 hardware tracking NOAA 17. New files: - craft_client.py — stdlib HTTP client (urllib only, no deps) - widgets/craft_panel.py — search table, pass info, tracking status - tests/test_craft_mode.py — 5 unit tests with mocked API - tests/test_craft_integration.py — 3 hardware integration tests
223 lines
7.2 KiB
Python
223 lines
7.2 KiB
Python
"""Integration test: Craft mode with real hardware.
|
|
|
|
Exercises the full Craft -> SerialBridge -> dish code path.
|
|
Requires /dev/ttyUSB2 (Carryout G2 via RS-422).
|
|
|
|
NOT part of the normal test suite -- run explicitly:
|
|
uv run pytest tests/test_craft_integration.py -v -s
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
|
|
import pytest
|
|
|
|
from birdcage_tui.app import BirdcageApp
|
|
from birdcage_tui.screens.control import ControlScreen
|
|
from birdcage_tui.widgets.craft_panel import CraftPanel, CraftTrackingStatus
|
|
|
|
SERIAL_PORT = "/dev/ttyUSB2"
|
|
|
|
|
|
def _az_delta(a: float, b: float) -> float:
|
|
"""Minimum angular distance accounting for 360 wrap."""
|
|
d = abs(a - b) % 360.0
|
|
return min(d, 360.0 - d)
|
|
|
|
|
|
pytestmark = pytest.mark.skipif(
|
|
not os.path.exists(SERIAL_PORT),
|
|
reason=f"{SERIAL_PORT} not available",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_craft_search_with_real_api():
|
|
"""Search the live Craft API and verify results populate."""
|
|
app = BirdcageApp()
|
|
app.demo_mode = True
|
|
app.craft_url = "https://space.warehack.ing"
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
|
|
control = app.query_one("#control", ControlScreen)
|
|
control.switch_mode("craft")
|
|
await pilot.pause()
|
|
|
|
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
|
|
|
|
panel.post_message(CraftPanel.SearchRequested("ISS"))
|
|
await asyncio.sleep(3.0)
|
|
|
|
from textual.widgets import DataTable
|
|
|
|
table = app.query_one("#craft-results-table", DataTable)
|
|
print(f" Search returned {table.row_count} results")
|
|
assert table.row_count > 0, "Craft API returned no results for 'ISS'"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_direct_motor_command():
|
|
"""Verify move_motor works through the bridge.
|
|
|
|
Moves AZ by +1 degree and back. Isolates serial bridge
|
|
independent of Craft.
|
|
"""
|
|
app = BirdcageApp()
|
|
app.demo_mode = False
|
|
app.serial_port = SERIAL_PORT
|
|
app.firmware_name = "g2"
|
|
app.skip_init = True
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
await asyncio.sleep(1.0)
|
|
|
|
assert app.device is not None
|
|
assert app.device.is_connected
|
|
|
|
pos = app.device.get_position()
|
|
start_az = pos["azimuth"]
|
|
start_el = pos["elevation"]
|
|
print(f" Current: AZ={start_az:.2f} EL={start_el:.2f}")
|
|
|
|
# Move AZ by +1 degree (stay within wrap range)
|
|
target_az = (start_az + 1.0) % 360.0
|
|
print(f" Moving AZ to {target_az:.2f}")
|
|
app.device.move_motor(0, target_az)
|
|
await asyncio.sleep(3.0)
|
|
|
|
pos2 = app.device.get_position()
|
|
print(f" After move: AZ={pos2['azimuth']:.2f} EL={pos2['elevation']:.2f}")
|
|
|
|
delta = _az_delta(pos2["azimuth"], target_az)
|
|
print(f" AZ delta from target: {delta:.2f} degrees")
|
|
assert delta < 2.0, f"Dish didn't move close to target (delta={delta})"
|
|
|
|
# Move back
|
|
print(f" Returning to AZ={start_az:.2f}")
|
|
app.device.move_motor(0, start_az)
|
|
await asyncio.sleep(3.0)
|
|
|
|
pos3 = app.device.get_position()
|
|
print(f" Final: AZ={pos3['azimuth']:.2f} EL={pos3['elevation']:.2f}")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_craft_tracking_moves_real_dish():
|
|
"""Track a target via Craft API and verify the dish gets commands.
|
|
|
|
Uses the real serial device. Tracks briefly then returns
|
|
to the original position.
|
|
"""
|
|
app = BirdcageApp()
|
|
app.demo_mode = False
|
|
app.serial_port = SERIAL_PORT
|
|
app.firmware_name = "g2"
|
|
app.skip_init = True
|
|
app.craft_url = "https://space.warehack.ing"
|
|
|
|
async with app.run_test(size=(120, 40)) as pilot:
|
|
await pilot.pause()
|
|
await asyncio.sleep(1.0)
|
|
|
|
control = app.query_one("#control", ControlScreen)
|
|
control.switch_mode("craft")
|
|
await pilot.pause()
|
|
|
|
assert app.device is not None
|
|
assert app.device.is_connected
|
|
print(f" Device connected on {SERIAL_PORT}")
|
|
|
|
pos_before = app.device.get_position()
|
|
print(
|
|
f" Starting position: "
|
|
f"AZ={pos_before['azimuth']:.2f} "
|
|
f"EL={pos_before['elevation']:.2f}"
|
|
)
|
|
|
|
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
|
|
|
|
# Search for something likely above horizon
|
|
panel.post_message(CraftPanel.SearchRequested("Moon"))
|
|
await asyncio.sleep(3.0)
|
|
|
|
from textual.widgets import DataTable
|
|
|
|
table = app.query_one("#craft-results-table", DataTable)
|
|
print(f" Moon search: {table.row_count} results")
|
|
|
|
if table.row_count == 0:
|
|
panel.post_message(CraftPanel.SearchRequested("Jupiter"))
|
|
await asyncio.sleep(3.0)
|
|
print(f" Jupiter search: {table.row_count} results")
|
|
|
|
if table.row_count == 0:
|
|
panel.post_message(CraftPanel.SearchRequested("Sun"))
|
|
await asyncio.sleep(3.0)
|
|
print(f" Sun search: {table.row_count} results")
|
|
|
|
assert table.row_count > 0, "No search results found"
|
|
|
|
# Read the first row
|
|
first_key = list(table.rows.keys())[0]
|
|
row = table.get_row(first_key)
|
|
target_name = row[0]
|
|
target_type = row[1]
|
|
target_id = int(row[2])
|
|
print(f" Tracking: {target_name} ({target_type}:{target_id})")
|
|
|
|
# Start tracking
|
|
panel.post_message(
|
|
CraftPanel.TrackRequested(
|
|
target_type=target_type,
|
|
target_id=target_id,
|
|
name=target_name,
|
|
min_el=18.0,
|
|
)
|
|
)
|
|
|
|
# Let the tracking loop run a few cycles
|
|
await asyncio.sleep(6.0)
|
|
|
|
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
|
|
print(
|
|
f" State: {status.state}, "
|
|
f"AZ={status.azimuth:.2f}, EL={status.elevation:.2f}, "
|
|
f"moves={status.moves}, error={status.error!r}"
|
|
)
|
|
|
|
assert status.state in ("TRACKING", "WAITING"), (
|
|
f"Unexpected state: {status.state}"
|
|
)
|
|
|
|
if status.state == "TRACKING":
|
|
assert status.moves >= 1, "No motor commands issued"
|
|
pos_after = app.device.get_position()
|
|
print(
|
|
f" Position after: "
|
|
f"AZ={pos_after['azimuth']:.2f} "
|
|
f"EL={pos_after['elevation']:.2f}"
|
|
)
|
|
daz = _az_delta(pos_after["azimuth"], pos_before["azimuth"])
|
|
del_ = abs(pos_after["elevation"] - pos_before["elevation"])
|
|
print(f" Dish moved: dAZ={daz:.2f} dEL={del_:.2f}")
|
|
else:
|
|
print(" Target below horizon or min_el -- WAITING is OK")
|
|
|
|
# Stop tracking
|
|
control._stop_craft_tracking()
|
|
await pilot.pause()
|
|
|
|
# Return to starting position
|
|
print(
|
|
f" Returning to: "
|
|
f"AZ={pos_before['azimuth']:.2f} "
|
|
f"EL={pos_before['elevation']:.2f}"
|
|
)
|
|
app.device.move_to(pos_before["azimuth"], pos_before["elevation"])
|
|
await asyncio.sleep(2.0)
|
|
|
|
print(" Integration test complete")
|