Add Craft mode — direct satellite tracking via space.warehack.ing API

New F2 Control sub-mode that searches the Craft orbital catalog (22k+
objects), displays pass predictions, and drives the dish in real-time
using server-computed AZ/EL positions from /api/sky/up.

Tracking loop polls at ~1Hz, filters for the tracked target by
target_type + target_id (str, not int — handles satellites, planets,
stars, comets), and issues motor commands through the existing serial
bridge. Verified end-to-end with Carryout G2 hardware tracking NOAA 17.

New files:
- craft_client.py — stdlib HTTP client (urllib only, no deps)
- widgets/craft_panel.py — search table, pass info, tracking status
- tests/test_craft_mode.py — 5 unit tests with mocked API
- tests/test_craft_integration.py — 3 hardware integration tests
This commit is contained in:
Ryan Malloy 2026-02-16 02:07:42 -07:00
parent a249c98208
commit 6c1e9da773
9 changed files with 1379 additions and 85 deletions

View File

@ -54,6 +54,7 @@ class BirdcageApp(App):
serial_port: str = "/dev/ttyUSB0"
firmware_name: str = "g2"
skip_init: bool = False
craft_url: str = "https://space.warehack.ing"
device: object = None
shutdown_event: threading.Event = threading.Event()
@ -110,6 +111,7 @@ class BirdcageApp(App):
self.run_worker(self._initialize_device, thread=True)
self._distribute_device()
self._setup_craft_client()
self._update_status_strip_connection()
self._install_console()
self._start_position_poll()
@ -129,6 +131,18 @@ class BirdcageApp(App):
if hasattr(screen, "set_device"):
screen.set_device(self.device)
def _setup_craft_client(self) -> None:
"""Create a Craft API client and hand it to the control screen."""
from birdcage_tui.craft_client import CraftClient
client = CraftClient(base_url=self.craft_url)
try:
control = self.query_one("#control")
if hasattr(control, "set_craft_client"):
control.set_craft_client(client)
except Exception:
log.debug("Could not set craft client on control screen")
def _update_status_strip_connection(self) -> None:
"""Set the status strip's connection info from current device."""
strip = self.query_one("#status-strip", StatusStrip)
@ -310,6 +324,11 @@ def main() -> None:
parser.add_argument(
"--skip-init", action="store_true", help="Skip firmware initialization"
)
parser.add_argument(
"--craft-url",
default="https://space.warehack.ing",
help="Craft API base URL",
)
args = parser.parse_args()
app = BirdcageApp()
@ -317,6 +336,7 @@ def main() -> None:
app.serial_port = args.port
app.firmware_name = args.firmware
app.skip_init = args.skip_init
app.craft_url = args.craft_url
try:
app.run()
except KeyboardInterrupt:

View File

@ -0,0 +1,238 @@
"""Craft API client — stdlib-only HTTP client for space.warehack.ing.
Provides satellite search, pass predictions, and real-time sky positions
from the Craft orbital mechanics API. All methods are blocking designed
to be called from @work(thread=True) workers in the TUI.
Uses urllib.request + json only (no requests/httpx dependency).
"""
import json
import logging
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass, field
log = logging.getLogger(__name__)
@dataclass
class SearchResult:
"""A single result from the Craft search API."""
name: str
target_type: str
target_id: str # NORAD int for satellites, string for planets/stars/comets
score: float = 0.0
groups: list[str] = field(default_factory=list)
altitude_deg: float | None = None
@dataclass
class PassPrediction:
"""A single satellite pass prediction."""
satellite_name: str
norad_id: int
aos_time: str
aos_az: float
tca_time: str
tca_alt: float
tca_az: float
los_time: str
los_az: float
max_elevation: float
duration_seconds: int
is_visible: bool = False
magnitude: float | None = None
@dataclass
class TargetPosition:
"""Real-time position of an above-horizon object."""
name: str
target_type: str
target_id: str # NORAD int for satellites, string for planets/stars/comets
azimuth: float
altitude: float
distance_km: float = 0.0
range_rate: float = 0.0
is_above_horizon: bool = True
@dataclass
class CraftTrackingState:
"""Tracking loop state mirroring TrackingState from rotctld_server."""
status: str = "IDLE"
target_name: str = ""
azimuth: float = 0.0
elevation: float = 0.0
distance_km: float = 0.0
range_rate: float = 0.0
moves: int = 0
rate: float = 0.0
error: str = ""
_move_timestamps: list[float] = field(default_factory=list, repr=False)
def record_move(self) -> None:
self.moves += 1
now = time.monotonic()
self._move_timestamps.append(now)
cutoff = now - 30.0
self._move_timestamps = [t for t in self._move_timestamps if t > cutoff]
elapsed = now - self._move_timestamps[0]
if elapsed > 0:
self.rate = len(self._move_timestamps) / elapsed
else:
self.rate = 0.0
class CraftClient:
"""HTTP client for the Craft orbital mechanics API.
Args:
base_url: Craft API base URL.
timeout: HTTP request timeout in seconds.
"""
def __init__(
self,
base_url: str = "https://space.warehack.ing",
timeout: float = 10.0,
) -> None:
self._base_url = base_url.rstrip("/")
self._timeout = timeout
def _get(self, path: str, params: dict | None = None) -> dict:
"""Issue a GET request and return parsed JSON."""
url = f"{self._base_url}{path}"
if params:
url += "?" + urllib.parse.urlencode(params)
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=self._timeout) as resp:
return json.loads(resp.read().decode())
def health(self) -> bool:
"""Check if the Craft API is reachable."""
try:
self._get("/api/search", {"q": "ISS", "limit": "1"})
return True
except Exception:
log.debug("Craft API health check failed", exc_info=True)
return False
def search(self, query: str, limit: int = 20) -> list[SearchResult]:
"""Search the Craft object catalog.
Returns satellites, planets, stars, comets matching the query.
"""
try:
data = self._get("/api/search", {"q": query, "limit": str(limit)})
except Exception:
log.warning("Craft search failed for %r", query, exc_info=True)
return []
results = []
for item in data.get("results", []):
results.append(
SearchResult(
name=item.get("name", ""),
target_type=item.get("target_type", ""),
target_id=str(item.get("target_id", "")),
score=float(item.get("score", 0)),
groups=item.get("groups", []),
altitude_deg=item.get("altitude_deg"),
)
)
return results
def get_passes(self, norad_id: int, hours: int = 24) -> list[PassPrediction]:
"""Get pass predictions for a satellite."""
try:
data = self._get("/api/passes", {"sat": str(norad_id), "hours": str(hours)})
except Exception:
log.warning("Craft passes failed for NORAD %d", norad_id, exc_info=True)
return []
passes = []
for p in data.get("passes", []):
passes.append(
PassPrediction(
satellite_name=p.get("satellite_name", ""),
norad_id=int(p.get("norad_id", norad_id)),
aos_time=p.get("aos_time", ""),
aos_az=float(p.get("aos_az", 0)),
tca_time=p.get("tca_time", ""),
tca_alt=float(p.get("tca_alt", 0)),
tca_az=float(p.get("tca_az", 0)),
los_time=p.get("los_time", ""),
los_az=float(p.get("los_az", 0)),
max_elevation=float(p.get("max_elevation", 0)),
duration_seconds=int(p.get("duration_seconds", 0)),
is_visible=bool(p.get("is_visible", False)),
magnitude=p.get("magnitude"),
)
)
return passes
def get_next_pass(self, norad_id: int) -> PassPrediction | None:
"""Get the next upcoming pass for a satellite."""
try:
data = self._get("/api/passes/next", {"sat": str(norad_id)})
except Exception:
log.warning("Craft next pass failed for NORAD %d", norad_id, exc_info=True)
return None
passes = data.get("passes", [])
if not passes:
return None
p = passes[0]
return PassPrediction(
satellite_name=p.get("satellite_name", ""),
norad_id=int(p.get("norad_id", norad_id)),
aos_time=p.get("aos_time", ""),
aos_az=float(p.get("aos_az", 0)),
tca_time=p.get("tca_time", ""),
tca_alt=float(p.get("tca_alt", 0)),
tca_az=float(p.get("tca_az", 0)),
los_time=p.get("los_time", ""),
los_az=float(p.get("los_az", 0)),
max_elevation=float(p.get("max_elevation", 0)),
duration_seconds=int(p.get("duration_seconds", 0)),
is_visible=bool(p.get("is_visible", False)),
magnitude=p.get("magnitude"),
)
def get_visible_targets(self, min_alt: float = 0.0) -> list[TargetPosition]:
"""Get all above-horizon objects with real-time AZ/EL positions.
This is the primary tracking endpoint returns 1000+ objects
(satellites, planets, stars, comets) with positions computed
server-side via Postgres SGP4.
"""
try:
data = self._get("/api/sky/up", {"min_alt": str(min_alt)})
except Exception:
log.warning("Craft sky/up failed", exc_info=True)
return []
targets = []
for obj in data.get("objects", []):
targets.append(
TargetPosition(
name=obj.get("name", ""),
target_type=obj.get("target_type", ""),
target_id=str(obj.get("target_id", "")),
azimuth=float(obj.get("azimuth_deg", 0)),
altitude=float(obj.get("altitude_deg", 0)),
distance_km=float(obj.get("distance_km", 0)),
range_rate=float(obj.get("range_rate_km_s", 0)),
is_above_horizon=bool(obj.get("is_above_horizon", True)),
)
)
return targets

View File

@ -24,9 +24,7 @@ class TrackingState:
client: str = ""
moves: int = 0
rate: float = 0.0
_move_timestamps: list[float] = field(
default_factory=list, repr=False
)
_move_timestamps: list[float] = field(default_factory=list, repr=False)
def record_move(self) -> None:
self.moves += 1
@ -34,9 +32,7 @@ class TrackingState:
self._move_timestamps.append(now)
# Keep only last 30 seconds of timestamps for rate calc.
cutoff = now - 30.0
self._move_timestamps = [
t for t in self._move_timestamps if t > cutoff
]
self._move_timestamps = [t for t in self._move_timestamps if t > cutoff]
elapsed = now - self._move_timestamps[0]
if elapsed > 0:
self.rate = len(self._move_timestamps) / elapsed
@ -85,18 +81,12 @@ class TuiRotctldServer:
Blocks until stop() is called.
"""
self._server_socket = socket.socket(
socket.AF_INET, socket.SOCK_STREAM
)
self._server_socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
)
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self._server_socket.bind((self._host, self._port))
except OSError:
logger.error(
"Failed to bind %s:%d", self._host, self._port
)
logger.error("Failed to bind %s:%d", self._host, self._port)
self._state.status = "STOPPED"
self._notify()
return
@ -134,14 +124,10 @@ class TuiRotctldServer:
try:
self._handle_connection(conn)
except Exception:
logger.exception(
"Error handling rotctld client %s", addr_str
)
logger.exception("Error handling rotctld client %s", addr_str)
finally:
conn.close()
logger.info(
"rotctld client disconnected: %s", addr_str
)
logger.info("rotctld client disconnected: %s", addr_str)
if self._running:
self._state.status = "LISTENING"
self._state.client = ""
@ -187,15 +173,11 @@ class TuiRotctldServer:
conn.sendall(b"RPRT 0\n")
return
elif cmd == "_":
conn.sendall(
f"{MODEL_NAME}\n".encode()
)
conn.sendall(f"{MODEL_NAME}\n".encode())
elif cmd == "q":
return
else:
logger.warning(
"Unknown rotctld command: %s", cmd
)
logger.warning("Unknown rotctld command: %s", cmd)
conn.sendall(b"RPRT -1\n")
def _cmd_get_position(self, conn: socket.socket) -> None:
@ -208,9 +190,7 @@ class TuiRotctldServer:
logger.exception("Failed to get position for rotctld")
conn.sendall(b"RPRT -1\n")
def _cmd_set_position(
self, conn: socket.socket, parts: list[str]
) -> None:
def _cmd_set_position(self, conn: socket.socket, parts: list[str]) -> None:
try:
target_az = float(parts[1])
target_el = float(parts[2])

View File

@ -16,8 +16,10 @@ from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Button, ContentSwitcher, Input, Static
from textual.worker import Worker
from birdcage_tui.craft_client import CraftClient, CraftTrackingState
from birdcage_tui.rotctld_server import TrackingState, TuiRotctldServer
from birdcage_tui.widgets.compass_rose import CompassRose
from birdcage_tui.widgets.craft_panel import CraftPanel
from birdcage_tui.widgets.mode_bar import ModeBar
from birdcage_tui.widgets.motor_status import MotorStatus
from birdcage_tui.widgets.preset_list import PresetList
@ -56,6 +58,12 @@ class ControlScreen(Container):
self._step_size: float = 1.0
self._rotctld_server: TuiRotctldServer | None = None
self._rotctld_thread: threading.Thread | None = None
# Craft tracking state
self._craft_client: CraftClient | None = None
self._craft_tracking: bool = False
self._craft_state: CraftTrackingState = CraftTrackingState()
self._craft_target_type: str = ""
self._craft_target_id: str = ""
# ------------------------------------------------------------------
# Compose
@ -68,6 +76,7 @@ class ControlScreen(Container):
"manual": "Manual",
"presets": "Presets",
"track": "Track",
"craft": "Craft",
},
initial="manual",
classes="mode-bar",
@ -145,6 +154,11 @@ class ControlScreen(Container):
yield Static("Satellite Tracking", classes="panel-title")
yield TrackingPanel(id="ctrl-tracking-panel")
# -- Craft mode --
with Container(id="craft"), Vertical(classes="panel"):
yield Static("Craft Tracking", classes="panel-title")
yield CraftPanel(id="ctrl-craft-panel")
# ------------------------------------------------------------------
# Device lifecycle
# ------------------------------------------------------------------
@ -161,9 +175,10 @@ class ControlScreen(Container):
self._start_data_poll()
def on_unmount(self) -> None:
"""Stop polling thread and rotctld server on teardown."""
"""Stop polling thread, rotctld server, and craft tracking on teardown."""
self._polling = False
self._stop_rotctld()
self._stop_craft_tracking()
# ------------------------------------------------------------------
# Mode switching
@ -466,15 +481,11 @@ class ControlScreen(Container):
self, event: TrackingPanel.StartRequested
) -> None:
if self._device is None:
self.app.notify(
"No device connected", severity="warning"
)
self.app.notify("No device connected", severity="warning")
return
if self._rotctld_server is not None:
self.app.notify(
"Server already running", severity="warning"
)
self.app.notify("Server already running", severity="warning")
return
log.info(
@ -485,9 +496,7 @@ class ControlScreen(Container):
)
def status_callback(state: TrackingState) -> None:
self.app.call_from_thread(
self._apply_tracking_state, state
)
self.app.call_from_thread(self._apply_tracking_state, state)
self._rotctld_server = TuiRotctldServer(
device=self._device,
@ -526,9 +535,7 @@ class ControlScreen(Container):
def _apply_tracking_state(self, state: TrackingState) -> None:
try:
panel = self.query_one(
"#ctrl-tracking-panel", TrackingPanel
)
panel = self.query_one("#ctrl-tracking-panel", TrackingPanel)
panel.set_status(
state=state.status,
client=state.client,
@ -538,6 +545,224 @@ class ControlScreen(Container):
except Exception:
pass
# ------------------------------------------------------------------
# Craft API integration
# ------------------------------------------------------------------
def set_craft_client(self, client: CraftClient) -> None:
"""Store the Craft API client reference."""
self._craft_client = client
def on_craft_panel_search_requested(
self, event: CraftPanel.SearchRequested
) -> None:
if self._craft_client is None:
self.app.notify("Craft API not configured", severity="warning")
return
self._do_craft_search(event.query)
def on_craft_panel_passes_requested(
self, event: CraftPanel.PassesRequested
) -> None:
if self._craft_client is None:
self.app.notify("Craft API not configured", severity="warning")
return
self._do_craft_passes(event.norad_id)
def on_craft_panel_track_requested(self, event: CraftPanel.TrackRequested) -> None:
if self._craft_client is None:
self.app.notify("Craft API not configured", severity="warning")
return
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
if self._craft_tracking:
self.app.notify("Already tracking", severity="warning")
return
self._craft_target_type = event.target_type
self._craft_target_id = event.target_id
self._craft_state = CraftTrackingState(
status="TRACKING", target_name=event.name
)
self._craft_tracking = True
self._run_craft_tracking(
event.name, event.target_type, event.target_id, event.min_el
)
def on_craft_panel_stop_tracking_requested(
self, _event: CraftPanel.StopTrackingRequested
) -> None:
self._stop_craft_tracking()
def _stop_craft_tracking(self) -> None:
if not self._craft_tracking:
return
self._craft_tracking = False
self._craft_state.status = "IDLE"
try:
panel = self.query_one("#ctrl-craft-panel", CraftPanel)
panel.set_tracking_status(state="IDLE")
except Exception:
pass
@work(thread=True, exclusive=True, group="craft-search")
def _do_craft_search(self, query: str) -> None:
"""Search the Craft catalog in a worker thread."""
try:
results = self._craft_client.search(query)
rows = [
{
"name": r.name,
"target_type": r.target_type,
"target_id": r.target_id,
"groups": r.groups,
}
for r in results
]
self.app.call_from_thread(self._apply_craft_search, rows)
except Exception:
log.exception("Craft search failed")
self.app.call_from_thread(
self.app.notify, "Search failed", severity="error"
)
def _apply_craft_search(self, rows: list[dict]) -> None:
try:
panel = self.query_one("#ctrl-craft-panel", CraftPanel)
panel.set_search_results(rows)
self.app.notify(f"Found {len(rows)} results")
except Exception:
pass
@work(thread=True, exclusive=True, group="craft-passes")
def _do_craft_passes(self, norad_id: int) -> None:
"""Fetch pass predictions in a worker thread."""
try:
passes = self._craft_client.get_passes(norad_id)
if not passes:
self.app.call_from_thread(
self._apply_craft_passes, "No upcoming passes"
)
return
lines = []
for p in passes[:8]:
# Format: AOS time MaxEL Duration
aos = p.aos_time[:16] if len(p.aos_time) > 16 else p.aos_time
mins = p.duration_seconds // 60
secs = p.duration_seconds % 60
vis = " *" if p.is_visible else ""
lines.append(
f" {aos} MaxEL {p.max_elevation:5.1f}\u00b0"
f" {mins}m{secs:02d}s{vis}"
)
text = "\n".join(lines)
self.app.call_from_thread(self._apply_craft_passes, text)
except Exception:
log.exception("Craft passes failed")
self.app.call_from_thread(
self.app.notify, "Pass lookup failed", severity="error"
)
def _apply_craft_passes(self, text: str) -> None:
try:
panel = self.query_one("#ctrl-craft-panel", CraftPanel)
panel.set_passes(text)
except Exception:
pass
@work(thread=True, exclusive=True, group="craft-track")
def _run_craft_tracking(
self,
name: str,
target_type: str,
target_id: str,
min_el: float,
) -> None:
"""Poll Craft API at ~1 Hz and drive the dish to track a target."""
shutdown = self.app.shutdown_event
state = self._craft_state
state.status = "TRACKING"
state.target_name = name
state.error = ""
self.app.call_from_thread(self._apply_craft_state, state)
while self._craft_tracking and not shutdown.is_set():
try:
targets = self._craft_client.get_visible_targets(min_alt=0.0)
except Exception:
state.status = "ERROR"
state.error = "API request failed"
self.app.call_from_thread(self._apply_craft_state, state)
shutdown.wait(5.0)
continue
# Find our target
match = None
for t in targets:
if t.target_type == target_type and t.target_id == target_id:
match = t
break
if match is None:
state.status = "WAITING"
state.error = "Target below horizon"
state.azimuth = 0.0
state.elevation = 0.0
self.app.call_from_thread(self._apply_craft_state, state)
shutdown.wait(1.0)
continue
state.azimuth = match.azimuth
state.elevation = match.altitude
state.distance_km = match.distance_km
state.range_rate = match.range_rate
if match.altitude < min_el:
state.status = "WAITING"
state.error = f"EL {match.altitude:.1f}\u00b0 < min {min_el:.1f}\u00b0"
self.app.call_from_thread(self._apply_craft_state, state)
shutdown.wait(1.0)
continue
# Drive the dish
state.status = "TRACKING"
state.error = ""
try:
self._device.move_motor(0, match.azimuth)
self._device.move_motor(1, match.altitude)
state.record_move()
except Exception:
log.exception("Craft motor command failed")
state.error = "Motor command failed"
self.app.call_from_thread(self._apply_craft_state, state)
shutdown.wait(1.0)
# Clean exit
state.status = "IDLE"
state.error = ""
self.app.call_from_thread(self._apply_craft_state, state)
def _apply_craft_state(self, state: CraftTrackingState) -> None:
try:
panel = self.query_one("#ctrl-craft-panel", CraftPanel)
panel.set_tracking_status(
state=state.status,
target_name=state.target_name,
azimuth=state.azimuth,
elevation=state.elevation,
distance_km=state.distance_km,
range_rate=state.range_rate,
moves=state.moves,
rate=state.rate,
error=state.error,
)
except Exception:
pass
# ------------------------------------------------------------------
# Key binding actions
# ------------------------------------------------------------------

View File

@ -708,6 +708,72 @@ ProgressBar PercentageStatus {
margin-left: 1;
}
/* ── Craft Tracking Panel ─────────────────────────── */
.craft-search-bar {
layout: horizontal;
height: 3;
padding: 0 1;
dock: top;
background: #0e1420;
border-bottom: solid #1a2a38;
}
.craft-search-bar Input {
width: 1fr;
margin-right: 1;
}
.craft-search-bar Button {
width: 12;
}
#craft-results-table {
height: 1fr;
min-height: 6;
margin: 0 1;
}
#craft-pass-info {
height: auto;
min-height: 3;
padding: 1 2;
background: #0e1420;
border: round #1a2a3a;
margin: 0 1;
}
#craft-tracking-status {
height: auto;
min-height: 5;
padding: 1 2;
background: #0e1420;
border: round #1a2a3a;
margin: 0 1;
}
.craft-controls {
dock: bottom;
height: auto;
padding: 1;
background: #0e1420;
border-top: solid #1a2a38;
layout: horizontal;
}
.craft-controls .label {
width: auto;
padding: 1 1 0 0;
}
.craft-controls Input {
width: 8;
}
.craft-controls Button {
margin-right: 1;
}
/* ── Scrollbar Styling ─────────────────────────────── */
* {

View File

@ -0,0 +1,312 @@
"""Craft tracking panel — search, pass predictions, and live satellite tracking.
Provides the UI surface for the Craft API integration. Posts messages
upward to ControlScreen which handles the actual HTTP calls and motor
commands in worker threads.
"""
from rich.text import Text
from textual.app import ComposeResult
from textual.containers import Container, Horizontal
from textual.message import Message
from textual.reactive import reactive
from textual.widgets import Button, DataTable, Input, Static
class CraftPanel(Container):
"""Panel for Craft satellite tracking — search, passes, and live track.
All heavy lifting (HTTP calls, motor commands) is handled by the parent
ControlScreen. This widget is the control surface only.
"""
class SearchRequested(Message):
def __init__(self, query: str) -> None:
super().__init__()
self.query = query
class TrackRequested(Message):
def __init__(
self,
target_type: str,
target_id: str,
name: str,
min_el: float,
) -> None:
super().__init__()
self.target_type = target_type
self.target_id = target_id
self.name = name
self.min_el = min_el
class StopTrackingRequested(Message):
pass
class PassesRequested(Message):
def __init__(self, norad_id: int) -> None:
super().__init__()
self.norad_id = norad_id
def compose(self) -> ComposeResult:
with Horizontal(classes="craft-search-bar"):
yield Input(
placeholder="Search satellites, planets...",
id="craft-search-input",
)
yield Button("Search", id="btn-craft-search", variant="primary")
yield DataTable(id="craft-results-table")
yield CraftPassInfo(id="craft-pass-info")
yield CraftTrackingStatus(id="craft-tracking-status")
with Horizontal(classes="craft-controls"):
yield Button("Track", id="btn-craft-track", variant="primary")
yield Button("Stop", id="btn-craft-stop")
yield Button("Passes", id="btn-craft-passes")
yield Static(" Min EL: ", classes="label")
yield Input(value="18.0", id="craft-minel-input", type="number")
def on_mount(self) -> None:
table = self.query_one("#craft-results-table", DataTable)
table.add_columns("Name", "Type", "ID", "Groups")
table.cursor_type = "row"
table.zebra_stripes = True
# ------------------------------------------------------------------
# Public API (called by ControlScreen)
# ------------------------------------------------------------------
def set_search_results(self, results: list[dict]) -> None:
"""Populate the results table from search data."""
table = self.query_one("#craft-results-table", DataTable)
table.clear()
for r in results:
groups = ", ".join(r.get("groups", []))
table.add_row(
r.get("name", ""),
r.get("target_type", ""),
str(r.get("target_id", "")),
groups,
key=f"{r.get('target_type', '')}:{r.get('target_id', '')}",
)
def set_passes(self, passes_text: str) -> None:
"""Update the pass prediction display."""
info = self.query_one("#craft-pass-info", CraftPassInfo)
info.passes_text = passes_text
def set_tracking_status(
self,
state: str = "IDLE",
target_name: str = "",
azimuth: float = 0.0,
elevation: float = 0.0,
distance_km: float = 0.0,
range_rate: float = 0.0,
moves: int = 0,
rate: float = 0.0,
error: str = "",
) -> None:
"""Update the tracking status display."""
status = self.query_one("#craft-tracking-status", CraftTrackingStatus)
status.state = state
status.target_name = target_name
status.azimuth = azimuth
status.elevation = elevation
status.distance_km = distance_km
status.range_rate = range_rate
status.moves = moves
status.rate = rate
status.error = error
# ------------------------------------------------------------------
# Selected row helpers
# ------------------------------------------------------------------
def _get_selected_row(self) -> dict | None:
"""Return the selected row's data as a dict, or None."""
table = self.query_one("#craft-results-table", DataTable)
if table.row_count == 0:
return None
try:
row_key, _ = table.coordinate_to_cell_key(table.cursor_coordinate)
except Exception:
return None
row = table.get_row(row_key)
if not row:
return None
return {
"name": row[0],
"target_type": row[1],
"target_id": row[2],
}
# ------------------------------------------------------------------
# Button handlers
# ------------------------------------------------------------------
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id or ""
if button_id == "btn-craft-search":
self._handle_search()
elif button_id == "btn-craft-track":
self._handle_track()
elif button_id == "btn-craft-stop":
self._handle_stop()
elif button_id == "btn-craft-passes":
self._handle_passes()
def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "craft-search-input":
self._handle_search()
def _handle_search(self) -> None:
query = self.query_one("#craft-search-input", Input).value.strip()
if not query:
self.app.notify("Enter a search term", severity="warning")
return
self.post_message(self.SearchRequested(query))
def _handle_track(self) -> None:
row = self._get_selected_row()
if row is None:
self.app.notify("Select a target first", severity="warning")
return
try:
min_el = float(self.query_one("#craft-minel-input", Input).value)
except ValueError:
min_el = 18.0
self.post_message(
self.TrackRequested(
target_type=row["target_type"],
target_id=row["target_id"],
name=row["name"],
min_el=min_el,
)
)
def _handle_stop(self) -> None:
self.post_message(self.StopTrackingRequested())
def _handle_passes(self) -> None:
row = self._get_selected_row()
if row is None:
self.app.notify("Select a satellite first", severity="warning")
return
if row["target_type"] != "satellite":
self.app.notify("Pass predictions require a satellite", severity="warning")
return
self.post_message(self.PassesRequested(norad_id=int(row["target_id"])))
class CraftTrackingStatus(Static):
"""Rich text display of Craft tracking state."""
state: reactive[str] = reactive("IDLE")
target_name: reactive[str] = reactive("")
azimuth: reactive[float] = reactive(0.0)
elevation: reactive[float] = reactive(0.0)
distance_km: reactive[float] = reactive(0.0)
range_rate: reactive[float] = reactive(0.0)
moves: reactive[int] = reactive(0)
rate: reactive[float] = reactive(0.0)
error: reactive[str] = reactive("")
def render(self) -> Text:
result = Text()
w = 10
# Status
result.append("Status".ljust(w), style="#506878")
if self.state == "TRACKING":
result.append("TRACKING", style="#00e060 bold")
elif self.state == "WAITING":
result.append("WAITING", style="#e8a020 bold")
elif self.state == "ERROR":
result.append("ERROR", style="#e04040 bold")
else:
result.append("IDLE", style="#506878")
result.append("\n")
# Target
result.append("Target".ljust(w), style="#506878")
if self.target_name:
result.append(self.target_name, style="#c8d0d8")
else:
result.append("(none)", style="#384858")
result.append("\n")
# Position
result.append("Sat AZ".ljust(w), style="#506878")
result.append(f"{self.azimuth:.2f}", style="#00d4aa")
result.append(" Sat EL ", style="#506878")
result.append(f"{self.elevation:.2f}", style="#00d4aa")
result.append("\n")
# Distance + range rate
result.append("Dist".ljust(w), style="#506878")
result.append(f"{self.distance_km:.0f} km", style="#c8d0d8")
result.append(" Rate: ", style="#506878")
rate_style = "#e04040" if self.range_rate > 0 else "#00e060"
result.append(f"{self.range_rate:+.1f} km/s", style=rate_style)
result.append("\n")
# Move stats
result.append("Moves".ljust(w), style="#506878")
result.append(f"{self.moves}", style="#c8d0d8")
result.append(" Rate: ", style="#506878")
result.append(f"{self.rate:.1f}/s", style="#c8d0d8")
# Error
if self.error:
result.append("\n")
result.append("Error".ljust(w), style="#506878")
result.append(self.error, style="#e04040")
return result
def watch_state(self, _v: str) -> None:
self.refresh()
def watch_target_name(self, _v: str) -> None:
self.refresh()
def watch_azimuth(self, _v: float) -> None:
self.refresh()
def watch_elevation(self, _v: float) -> None:
self.refresh()
def watch_distance_km(self, _v: float) -> None:
self.refresh()
def watch_range_rate(self, _v: float) -> None:
self.refresh()
def watch_moves(self, _v: int) -> None:
self.refresh()
def watch_rate(self, _v: float) -> None:
self.refresh()
def watch_error(self, _v: str) -> None:
self.refresh()
class CraftPassInfo(Static):
"""Display upcoming pass predictions as formatted text."""
passes_text: reactive[str] = reactive("")
def render(self) -> Text:
result = Text()
result.append("Upcoming Passes\n", style="#00d4aa bold")
if self.passes_text:
result.append(self.passes_text, style="#c8d0d8")
else:
result.append("(search and select a satellite)", style="#384858")
return result
def watch_passes_text(self, _v: str) -> None:
self.refresh()

View File

@ -0,0 +1,222 @@
"""Integration test: Craft mode with real hardware.
Exercises the full Craft -> SerialBridge -> dish code path.
Requires /dev/ttyUSB2 (Carryout G2 via RS-422).
NOT part of the normal test suite -- run explicitly:
uv run pytest tests/test_craft_integration.py -v -s
"""
import asyncio
import os
import pytest
from birdcage_tui.app import BirdcageApp
from birdcage_tui.screens.control import ControlScreen
from birdcage_tui.widgets.craft_panel import CraftPanel, CraftTrackingStatus
SERIAL_PORT = "/dev/ttyUSB2"
def _az_delta(a: float, b: float) -> float:
"""Minimum angular distance accounting for 360 wrap."""
d = abs(a - b) % 360.0
return min(d, 360.0 - d)
pytestmark = pytest.mark.skipif(
not os.path.exists(SERIAL_PORT),
reason=f"{SERIAL_PORT} not available",
)
@pytest.mark.asyncio
async def test_craft_search_with_real_api():
"""Search the live Craft API and verify results populate."""
app = BirdcageApp()
app.demo_mode = True
app.craft_url = "https://space.warehack.ing"
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
control = app.query_one("#control", ControlScreen)
control.switch_mode("craft")
await pilot.pause()
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
panel.post_message(CraftPanel.SearchRequested("ISS"))
await asyncio.sleep(3.0)
from textual.widgets import DataTable
table = app.query_one("#craft-results-table", DataTable)
print(f" Search returned {table.row_count} results")
assert table.row_count > 0, "Craft API returned no results for 'ISS'"
@pytest.mark.asyncio
async def test_direct_motor_command():
"""Verify move_motor works through the bridge.
Moves AZ by +1 degree and back. Isolates serial bridge
independent of Craft.
"""
app = BirdcageApp()
app.demo_mode = False
app.serial_port = SERIAL_PORT
app.firmware_name = "g2"
app.skip_init = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await asyncio.sleep(1.0)
assert app.device is not None
assert app.device.is_connected
pos = app.device.get_position()
start_az = pos["azimuth"]
start_el = pos["elevation"]
print(f" Current: AZ={start_az:.2f} EL={start_el:.2f}")
# Move AZ by +1 degree (stay within wrap range)
target_az = (start_az + 1.0) % 360.0
print(f" Moving AZ to {target_az:.2f}")
app.device.move_motor(0, target_az)
await asyncio.sleep(3.0)
pos2 = app.device.get_position()
print(f" After move: AZ={pos2['azimuth']:.2f} EL={pos2['elevation']:.2f}")
delta = _az_delta(pos2["azimuth"], target_az)
print(f" AZ delta from target: {delta:.2f} degrees")
assert delta < 2.0, f"Dish didn't move close to target (delta={delta})"
# Move back
print(f" Returning to AZ={start_az:.2f}")
app.device.move_motor(0, start_az)
await asyncio.sleep(3.0)
pos3 = app.device.get_position()
print(f" Final: AZ={pos3['azimuth']:.2f} EL={pos3['elevation']:.2f}")
@pytest.mark.asyncio
async def test_craft_tracking_moves_real_dish():
"""Track a target via Craft API and verify the dish gets commands.
Uses the real serial device. Tracks briefly then returns
to the original position.
"""
app = BirdcageApp()
app.demo_mode = False
app.serial_port = SERIAL_PORT
app.firmware_name = "g2"
app.skip_init = True
app.craft_url = "https://space.warehack.ing"
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await asyncio.sleep(1.0)
control = app.query_one("#control", ControlScreen)
control.switch_mode("craft")
await pilot.pause()
assert app.device is not None
assert app.device.is_connected
print(f" Device connected on {SERIAL_PORT}")
pos_before = app.device.get_position()
print(
f" Starting position: "
f"AZ={pos_before['azimuth']:.2f} "
f"EL={pos_before['elevation']:.2f}"
)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
# Search for something likely above horizon
panel.post_message(CraftPanel.SearchRequested("Moon"))
await asyncio.sleep(3.0)
from textual.widgets import DataTable
table = app.query_one("#craft-results-table", DataTable)
print(f" Moon search: {table.row_count} results")
if table.row_count == 0:
panel.post_message(CraftPanel.SearchRequested("Jupiter"))
await asyncio.sleep(3.0)
print(f" Jupiter search: {table.row_count} results")
if table.row_count == 0:
panel.post_message(CraftPanel.SearchRequested("Sun"))
await asyncio.sleep(3.0)
print(f" Sun search: {table.row_count} results")
assert table.row_count > 0, "No search results found"
# Read the first row
first_key = list(table.rows.keys())[0]
row = table.get_row(first_key)
target_name = row[0]
target_type = row[1]
target_id = int(row[2])
print(f" Tracking: {target_name} ({target_type}:{target_id})")
# Start tracking
panel.post_message(
CraftPanel.TrackRequested(
target_type=target_type,
target_id=target_id,
name=target_name,
min_el=18.0,
)
)
# Let the tracking loop run a few cycles
await asyncio.sleep(6.0)
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
print(
f" State: {status.state}, "
f"AZ={status.azimuth:.2f}, EL={status.elevation:.2f}, "
f"moves={status.moves}, error={status.error!r}"
)
assert status.state in ("TRACKING", "WAITING"), (
f"Unexpected state: {status.state}"
)
if status.state == "TRACKING":
assert status.moves >= 1, "No motor commands issued"
pos_after = app.device.get_position()
print(
f" Position after: "
f"AZ={pos_after['azimuth']:.2f} "
f"EL={pos_after['elevation']:.2f}"
)
daz = _az_delta(pos_after["azimuth"], pos_before["azimuth"])
del_ = abs(pos_after["elevation"] - pos_before["elevation"])
print(f" Dish moved: dAZ={daz:.2f} dEL={del_:.2f}")
else:
print(" Target below horizon or min_el -- WAITING is OK")
# Stop tracking
control._stop_craft_tracking()
await pilot.pause()
# Return to starting position
print(
f" Returning to: "
f"AZ={pos_before['azimuth']:.2f} "
f"EL={pos_before['elevation']:.2f}"
)
app.device.move_to(pos_before["azimuth"], pos_before["elevation"])
await asyncio.sleep(2.0)
print(" Integration test complete")

View File

@ -0,0 +1,259 @@
"""Test Craft mode — API-driven satellite tracking via the F2 Control screen.
Verifies search, pass predictions, tracking loop, and stop lifecycle using
mocked CraftClient methods. No real network calls.
NOTE: Uses post_message() for reliable button activation (same pattern as
test_track_mode.py).
"""
import asyncio
from unittest.mock import MagicMock
import pytest
from birdcage_tui.app import BirdcageApp
from birdcage_tui.craft_client import PassPrediction, SearchResult, TargetPosition
from birdcage_tui.screens.control import ControlScreen
from birdcage_tui.widgets.craft_panel import CraftPanel, CraftTrackingStatus
async def _switch_to_craft(pilot, app) -> None:
"""Navigate to F2 Control > Craft sub-mode."""
await pilot.press("f2")
await pilot.pause()
control = app.query_one("#control", ControlScreen)
control.switch_mode("craft")
await pilot.pause()
def _mock_search_results() -> list[SearchResult]:
return [
SearchResult(
name="ISS (ZARYA)",
target_type="satellite",
target_id="25544",
score=1.0,
groups=["stations"],
),
SearchResult(
name="NOAA 19",
target_type="satellite",
target_id="33591",
score=0.8,
groups=["weather"],
),
]
def _mock_passes() -> list[PassPrediction]:
return [
PassPrediction(
satellite_name="ISS (ZARYA)",
norad_id=25544,
aos_time="2026-02-16T08:20:00Z",
aos_az=220.0,
tca_time="2026-02-16T08:25:00Z",
tca_alt=45.3,
tca_az=180.0,
los_time="2026-02-16T08:30:00Z",
los_az=140.0,
max_elevation=45.3,
duration_seconds=600,
is_visible=True,
),
]
def _mock_visible_target() -> list[TargetPosition]:
return [
TargetPosition(
name="ISS (ZARYA)",
target_type="satellite",
target_id="25544",
azimuth=245.30,
altitude=34.20,
distance_km=420.0,
range_rate=-2.1,
is_above_horizon=True,
),
]
def _mock_below_horizon() -> list[TargetPosition]:
"""Return an empty list — target is not in the sky."""
return []
@pytest.mark.asyncio
async def test_search_populates_table():
"""Searching should populate the DataTable with results."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
# Mock the client
mock_client = MagicMock()
mock_client.search.return_value = _mock_search_results()
control.set_craft_client(mock_client)
# Post search request
panel.post_message(CraftPanel.SearchRequested("ISS"))
await asyncio.sleep(0.5)
# Verify table was populated
from textual.widgets import DataTable
table = app.query_one("#craft-results-table", DataTable)
assert table.row_count == 2
mock_client.search.assert_called_once_with("ISS")
@pytest.mark.asyncio
async def test_tracking_drives_device():
"""Tracking should poll positions and move the demo device."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
mock_client = MagicMock()
mock_client.get_visible_targets.return_value = _mock_visible_target()
control.set_craft_client(mock_client)
# Start tracking
panel.post_message(
CraftPanel.TrackRequested(
target_type="satellite",
target_id="25544",
name="ISS (ZARYA)",
min_el=18.0,
)
)
# Let the tracking loop run a couple iterations
await asyncio.sleep(2.5)
# Device target should have been updated
assert app.device._target_az == pytest.approx(245.30, abs=0.1)
assert app.device._target_el == pytest.approx(34.20, abs=0.1)
# Status should show TRACKING
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
assert status.state == "TRACKING"
assert status.moves >= 1
# Cleanup
control._stop_craft_tracking()
await pilot.pause()
@pytest.mark.asyncio
async def test_tracking_stops_cleanly():
"""Stopping tracking should return to IDLE state."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
mock_client = MagicMock()
mock_client.get_visible_targets.return_value = _mock_visible_target()
control.set_craft_client(mock_client)
# Start tracking
panel.post_message(
CraftPanel.TrackRequested(
target_type="satellite",
target_id="25544",
name="ISS (ZARYA)",
min_el=18.0,
)
)
await asyncio.sleep(1.5)
# Stop
panel.post_message(CraftPanel.StopTrackingRequested())
await asyncio.sleep(0.5)
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
assert status.state == "IDLE"
assert not control._craft_tracking
@pytest.mark.asyncio
async def test_below_horizon_shows_waiting():
"""Target not in sky/up results should show WAITING status."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
mock_client = MagicMock()
mock_client.get_visible_targets.return_value = _mock_below_horizon()
control.set_craft_client(mock_client)
# Start tracking
panel.post_message(
CraftPanel.TrackRequested(
target_type="satellite",
target_id="25544",
name="ISS (ZARYA)",
min_el=18.0,
)
)
await asyncio.sleep(1.5)
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
assert status.state == "WAITING"
# Cleanup
control._stop_craft_tracking()
await pilot.pause()
@pytest.mark.asyncio
async def test_passes_display():
"""Pass predictions should update the CraftPassInfo widget."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
mock_client = MagicMock()
mock_client.get_passes.return_value = _mock_passes()
control.set_craft_client(mock_client)
# Request passes
panel.post_message(CraftPanel.PassesRequested(norad_id=25544))
await asyncio.sleep(0.5)
from birdcage_tui.widgets.craft_panel import CraftPassInfo
info = app.query_one("#craft-pass-info", CraftPassInfo)
assert "45.3" in info.passes_text
mock_client.get_passes.assert_called_once_with(25544)

View File

@ -64,9 +64,7 @@ async def test_start_creates_listening_server():
await pilot.pause()
# Post StartRequested with a known port.
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14533, 18.0)
)
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14533, 18.0))
await _wait_for_listening(app)
status = app.query_one("#tracking-status", TrackingStatus)
@ -74,9 +72,7 @@ async def test_start_creates_listening_server():
assert control._rotctld_server is not None
# Verify the TCP port is open.
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14533
)
reader, writer = await asyncio.open_connection("127.0.0.1", 14533)
writer.close()
await writer.wait_closed()
@ -102,9 +98,7 @@ async def test_stop_shuts_down_server():
await pilot.pause()
# Start.
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14534, 18.0)
)
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14534, 18.0))
await _wait_for_listening(app)
assert control._rotctld_server is not None
@ -119,9 +113,7 @@ async def test_stop_shuts_down_server():
# Port should no longer be accepting.
with pytest.raises(OSError):
_r, _w = await asyncio.open_connection(
"127.0.0.1", 14534
)
_r, _w = await asyncio.open_connection("127.0.0.1", 14534)
@pytest.mark.asyncio
@ -147,16 +139,12 @@ async def test_status_display_updates():
assert status.moves == 0
# Start server.
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14535, 18.0)
)
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14535, 18.0))
await _wait_for_listening(app)
assert status.state == "LISTENING"
# Connect a client -- should transition to CONNECTED.
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14535
)
reader, writer = await asyncio.open_connection("127.0.0.1", 14535)
# Send a command so the server accept loop processes it.
writer.write(b"_\n")
await writer.drain()
@ -195,25 +183,17 @@ async def test_rotctld_get_position():
control.switch_mode("track")
await pilot.pause()
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14536, 18.0)
)
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14536, 18.0))
await _wait_for_listening(app)
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14536
)
reader, writer = await asyncio.open_connection("127.0.0.1", 14536)
# Query position.
writer.write(b"p\n")
await writer.drain()
az_line = await asyncio.wait_for(
reader.readline(), timeout=3.0
)
el_line = await asyncio.wait_for(
reader.readline(), timeout=3.0
)
az_line = await asyncio.wait_for(reader.readline(), timeout=3.0)
el_line = await asyncio.wait_for(reader.readline(), timeout=3.0)
az = float(az_line.decode().strip())
el = float(el_line.decode().strip())
@ -244,14 +224,10 @@ async def test_rotctld_set_position():
control.switch_mode("track")
await pilot.pause()
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14537, 18.0)
)
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14537, 18.0))
await _wait_for_listening(app)
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14537
)
reader, writer = await asyncio.open_connection("127.0.0.1", 14537)
# Move to a specific position.
writer.write(b"P 200.0 55.0\n")
@ -297,14 +273,10 @@ async def test_rotctld_quit_command():
control.switch_mode("track")
await pilot.pause()
panel.post_message(
TrackingPanel.StartRequested("127.0.0.1", 14538, 18.0)
)
panel.post_message(TrackingPanel.StartRequested("127.0.0.1", 14538, 18.0))
await _wait_for_listening(app)
reader, writer = await asyncio.open_connection(
"127.0.0.1", 14538
)
reader, writer = await asyncio.open_connection("127.0.0.1", 14538)
writer.write(b"q\n")
await writer.drain()