Compare commits
6 Commits
7035d814a1
...
b98a5482fa
| Author | SHA1 | Date | |
|---|---|---|---|
| b98a5482fa | |||
| b0aee4e5a6 | |||
| f8bfd69ceb | |||
| 8a6b99bd8c | |||
| 16ca4892b3 | |||
| 3013eeee4c |
72
CLAUDE.md
72
CLAUDE.md
@ -739,3 +739,75 @@ ssh -A warehack-ing@warehack.ing "cd birdcage-docs && git pull && make prod"
|
||||
```
|
||||
|
||||
TLS is automatic via caddy-docker-proxy (ACME + Vultr DNS challenge). New subdomains take ~2 minutes for certificate issuance.
|
||||
|
||||
### Screenshots
|
||||
|
||||
TUI screenshots live in `site/public/screenshots/` (the site repo, not the main repo).
|
||||
An automated capture script uses Textual's Pilot API to render each screen in demo mode.
|
||||
|
||||
**Regenerating all screenshots:**
|
||||
|
||||
```bash
|
||||
cd tui && uv run python scripts/capture_screenshots.py
|
||||
```
|
||||
|
||||
This captures 8 SVG + 8 PNG screenshots plus a 2×3 collage:
|
||||
- `tui-dashboard` — F1 action cards
|
||||
- `tui-control` — F2 Manual mode with compass rose
|
||||
- `tui-craft-search` — F2 Craft mode with satellite search results
|
||||
- `tui-craft-tracking` — F2 Craft mode tracking the Moon
|
||||
- `tui-signal` — F3 Monitor mode with RSSI gauge and receiver info
|
||||
- `tui-system` — F4 Hardware mode with firmware ID and A3981 diagnostics
|
||||
- `tui-console` — F5 overlay with serial console
|
||||
- `tui-camera` — F6 overlay with capture triggers
|
||||
- `tui-collage` — 2×3 montage of 6 main screens (via ImageMagick `montage`)
|
||||
|
||||
**Dependencies:** `rsvg-convert` (SVG→PNG), `montage` (collage).
|
||||
|
||||
**When to regenerate:** After any UI change that affects widget layout, button labels,
|
||||
or screen content. The script populates Craft search results and tracking state
|
||||
directly via widget API for deterministic screenshots.
|
||||
|
||||
## MCP Server
|
||||
|
||||
| Property | Value |
|
||||
|----------|-------|
|
||||
| Package | `birdcage-mcp` |
|
||||
| Source | `mcp/src/birdcage_mcp/` |
|
||||
| Entry point | `birdcage_mcp.server:main` |
|
||||
| Tools | 35 (connection, movement, signal, system, satellite, console) |
|
||||
| Resources | 5 (`birdcage://config`, `position`, `firmware`, `motor-dynamics`, `el-limits`) |
|
||||
| Prompts | 3 (`setup_wizard`, `satellite_tracking_guide`, `rf_sweep_guide`) |
|
||||
| Tests | `mcp/tests/` — 49 tests against DemoDevice via `run_server_async` |
|
||||
|
||||
### Running
|
||||
|
||||
```bash
|
||||
# Demo mode (no hardware)
|
||||
BIRDCAGE_DEMO=1 uv run --directory mcp birdcage-mcp
|
||||
|
||||
# Hardware mode
|
||||
BIRDCAGE_PORT=/dev/ttyUSB2 uv run --directory mcp birdcage-mcp
|
||||
```
|
||||
|
||||
### Adding to Claude Code
|
||||
|
||||
```bash
|
||||
claude mcp add birdcage-mcp -- env BIRDCAGE_DEMO=1 uv run --directory mcp birdcage-mcp
|
||||
```
|
||||
|
||||
### Environment Variables
|
||||
|
||||
| Variable | Default | Purpose |
|
||||
|----------|---------|---------|
|
||||
| `BIRDCAGE_DEMO` | `false` | Enable demo mode (DemoDevice + DemoCraftClient) |
|
||||
| `BIRDCAGE_PORT` | `/dev/ttyUSB0` | Serial port for hardware mode |
|
||||
| `BIRDCAGE_FIRMWARE` | `g2` | Firmware variant (`g2`, `hal205`, etc.) |
|
||||
| `BIRDCAGE_CRAFT_URL` | `https://space.warehack.ing` | Orbital prediction API |
|
||||
|
||||
### Testing
|
||||
|
||||
```bash
|
||||
cd mcp && uv run pytest tests/ # 49 tests via FastMCP run_server_async
|
||||
uv run ruff check src/ # Lint
|
||||
```
|
||||
|
||||
34
mcp/pyproject.toml
Normal file
34
mcp/pyproject.toml
Normal file
@ -0,0 +1,34 @@
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "birdcage-mcp"
|
||||
version = "2026.02.17"
|
||||
description = "FastMCP server for Winegard satellite dish control"
|
||||
license = "MIT"
|
||||
requires-python = ">=3.11"
|
||||
authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}]
|
||||
dependencies = [
|
||||
"birdcage",
|
||||
"fastmcp>=2.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
birdcage-mcp = "birdcage_mcp.server:main"
|
||||
|
||||
[tool.uv.sources]
|
||||
birdcage = { path = "..", editable = true }
|
||||
|
||||
[tool.ruff]
|
||||
target-version = "py311"
|
||||
src = ["src"]
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F", "I", "UP", "B", "SIM"]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/birdcage_mcp"]
|
||||
|
||||
[dependency-groups]
|
||||
dev = ["pytest>=9.0.2", "pytest-asyncio>=1.3.0"]
|
||||
1
mcp/src/birdcage_mcp/__init__.py
Normal file
1
mcp/src/birdcage_mcp/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
"""birdcage-mcp: FastMCP server for Winegard satellite dish control."""
|
||||
78
mcp/src/birdcage_mcp/prompts.py
Normal file
78
mcp/src/birdcage_mcp/prompts.py
Normal file
@ -0,0 +1,78 @@
|
||||
"""MCP prompts — guided workflows for common dish operations."""
|
||||
|
||||
|
||||
def register(mcp):
|
||||
"""Register MCP prompts on the FastMCP instance."""
|
||||
|
||||
@mcp.prompt()
|
||||
async def setup_wizard() -> str:
|
||||
"""Step-by-step guide to connect and initialize the satellite dish.
|
||||
|
||||
Walks through: check status -> connect to serial port -> verify
|
||||
firmware -> home motors -> confirm position.
|
||||
"""
|
||||
return (
|
||||
"Follow these steps to set up the satellite dish:\n\n"
|
||||
"1. Call `status` to check the current connection state.\n"
|
||||
"2. If not connected, call `connect` with the correct port "
|
||||
"and firmware variant.\n"
|
||||
"3. Call `get_firmware_id` to verify the firmware version.\n"
|
||||
"4. Call `home_motor` with motor_id=0 (AZ) then motor_id=1 (EL) "
|
||||
"to establish reference positions.\n"
|
||||
"5. Call `get_position` to verify the dish is at the expected "
|
||||
"home coordinates.\n"
|
||||
"6. Call `get_el_limits` to confirm the elevation operating range.\n\n"
|
||||
"The dish is now ready for tracking or manual positioning."
|
||||
)
|
||||
|
||||
@mcp.prompt()
|
||||
async def satellite_tracking_guide() -> str:
|
||||
"""Guide for tracking a satellite pass with the dish.
|
||||
|
||||
Walks through: search -> get passes -> wait for AOS -> poll
|
||||
position at 1 Hz -> move dish -> detect LOS.
|
||||
"""
|
||||
return (
|
||||
"Follow these steps to track a satellite:\n\n"
|
||||
"1. Call `search_satellites` with the satellite name "
|
||||
"(e.g. 'ISS', 'NOAA 19').\n"
|
||||
"2. Note the NORAD ID from the search results.\n"
|
||||
"3. Call `get_passes` with the NORAD ID to see upcoming passes.\n"
|
||||
"4. Choose a pass with good max elevation (>30 deg is ideal).\n"
|
||||
"5. When the pass is about to start (AOS time), begin a tracking "
|
||||
"loop:\n"
|
||||
" a. Call `get_visible_targets` to get the satellite's current "
|
||||
"AZ/EL.\n"
|
||||
" b. Call `move_to` with those coordinates.\n"
|
||||
" c. Wait ~1 second, then repeat from (a).\n"
|
||||
"6. Stop tracking when the satellite drops below your minimum "
|
||||
"elevation.\n\n"
|
||||
"TIP: Enable the LNA first with `enable_lna` if you want to "
|
||||
"measure signal strength during the pass."
|
||||
)
|
||||
|
||||
@mcp.prompt()
|
||||
async def rf_sweep_guide() -> str:
|
||||
"""Guide for performing an RF sky sweep with the dish.
|
||||
|
||||
Walks through: enable LNA -> baseline RSSI -> configure sweep
|
||||
-> run az_sweep -> analyze results.
|
||||
"""
|
||||
return (
|
||||
"Follow these steps for an RF sky sweep:\n\n"
|
||||
"1. Call `enable_lna` to power the low-noise amplifier.\n"
|
||||
"2. Call `get_rssi` to establish a baseline noise floor.\n"
|
||||
"3. Decide your sweep parameters:\n"
|
||||
" - start_az: Starting azimuth\n"
|
||||
" - span: How many degrees to sweep\n"
|
||||
" - step_cdeg: Resolution in centidegrees (100 = 1 deg)\n"
|
||||
" - num_xponders: Transponders per position (1 for basic)\n"
|
||||
"4. Call `az_sweep` with those parameters.\n"
|
||||
"5. Analyze the returned points:\n"
|
||||
" - Look for RSSI peaks above the noise floor (~500)\n"
|
||||
" - Check lock=1 points for strong signals\n"
|
||||
" - SNR values indicate signal quality\n\n"
|
||||
"For a 2D sky map, repeat the sweep at different elevations "
|
||||
"(e.g. EL 20 to 60 in 5 deg steps) using `move_motor` to "
|
||||
"set each elevation before sweeping."
|
||||
)
|
||||
57
mcp/src/birdcage_mcp/resources.py
Normal file
57
mcp/src/birdcage_mcp/resources.py
Normal file
@ -0,0 +1,57 @@
|
||||
"""MCP resources — live data endpoints for dish state."""
|
||||
|
||||
import json
|
||||
|
||||
from fastmcp import Context
|
||||
|
||||
|
||||
def register(mcp):
|
||||
"""Register MCP resources on the FastMCP instance."""
|
||||
|
||||
@mcp.resource("birdcage://config")
|
||||
async def config_resource(ctx: Context) -> str:
|
||||
"""Current connection configuration."""
|
||||
state = ctx.request_context.lifespan_context
|
||||
data = {
|
||||
"demo_mode": state.demo_mode,
|
||||
"serial_port": state.serial_port,
|
||||
"firmware": state.firmware_name,
|
||||
"connected": state.connected,
|
||||
}
|
||||
return json.dumps(data, indent=2)
|
||||
|
||||
@mcp.resource("birdcage://position")
|
||||
async def position_resource(ctx: Context) -> str:
|
||||
"""Live dish position (queries hardware)."""
|
||||
state = ctx.request_context.lifespan_context
|
||||
if state.device is None:
|
||||
return json.dumps({"error": "not connected"})
|
||||
pos = state.device.get_position()
|
||||
return json.dumps(pos, indent=2)
|
||||
|
||||
@mcp.resource("birdcage://firmware")
|
||||
async def firmware_resource(ctx: Context) -> str:
|
||||
"""Firmware identification string."""
|
||||
state = ctx.request_context.lifespan_context
|
||||
if state.device is None:
|
||||
return json.dumps({"error": "not connected"})
|
||||
fw_id = state.device.get_firmware_id()
|
||||
return fw_id
|
||||
|
||||
@mcp.resource("birdcage://motor-dynamics")
|
||||
async def motor_dynamics_resource(ctx: Context) -> str:
|
||||
"""Motor velocity and acceleration limits."""
|
||||
state = ctx.request_context.lifespan_context
|
||||
if state.device is None:
|
||||
return json.dumps({"error": "not connected"})
|
||||
dynamics = state.device.get_motor_dynamics()
|
||||
return json.dumps(dynamics, indent=2)
|
||||
|
||||
@mcp.resource("birdcage://el-limits")
|
||||
async def el_limits_resource(ctx: Context) -> str:
|
||||
"""Elevation angle limits (min, max, home)."""
|
||||
state = ctx.request_context.lifespan_context
|
||||
if state.device is None:
|
||||
return json.dumps({"error": "not connected"})
|
||||
limits = state.device.get_el_limits()
|
||||
return json.dumps(limits, indent=2)
|
||||
92
mcp/src/birdcage_mcp/server.py
Normal file
92
mcp/src/birdcage_mcp/server.py
Normal file
@ -0,0 +1,92 @@
|
||||
"""Birdcage MCP server — FastMCP entry point with lifespan management."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastmcp import FastMCP
|
||||
|
||||
from birdcage_mcp.state import BirdcageState
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(server: FastMCP):
|
||||
"""Server lifespan: configure state from environment, optionally pre-connect."""
|
||||
state = BirdcageState()
|
||||
|
||||
demo_env = os.environ.get("BIRDCAGE_DEMO", "false").lower()
|
||||
state.demo_mode = demo_env in ("1", "true", "yes")
|
||||
state.serial_port = os.environ.get("BIRDCAGE_PORT", "/dev/ttyUSB0")
|
||||
state.firmware_name = os.environ.get("BIRDCAGE_FIRMWARE", "g2")
|
||||
craft_url = os.environ.get(
|
||||
"BIRDCAGE_CRAFT_URL", "https://space.warehack.ing"
|
||||
)
|
||||
|
||||
if state.demo_mode:
|
||||
from birdcage.demo import DemoCraftClient, DemoDevice
|
||||
|
||||
device = DemoDevice()
|
||||
device.connect()
|
||||
device.initialize()
|
||||
state.device = device
|
||||
state.craft_client = DemoCraftClient()
|
||||
state.connected = True
|
||||
logger.info("Demo mode: DemoDevice + DemoCraftClient ready")
|
||||
else:
|
||||
from birdcage.craft_client import CraftClient
|
||||
|
||||
state.craft_client = CraftClient(base_url=craft_url)
|
||||
logger.info(
|
||||
"Hardware mode: use 'connect' tool to open serial port %s",
|
||||
state.serial_port,
|
||||
)
|
||||
|
||||
try:
|
||||
yield state
|
||||
finally:
|
||||
if state.device is not None and hasattr(state.device, "disconnect"):
|
||||
try:
|
||||
state.device.disconnect()
|
||||
except Exception:
|
||||
logger.debug("Disconnect error during shutdown", exc_info=True)
|
||||
logger.info("Birdcage MCP server shut down")
|
||||
|
||||
|
||||
mcp = FastMCP(
|
||||
"birdcage",
|
||||
instructions=(
|
||||
"Control a Winegard satellite dish (Carryout G2 / Trav'ler) for "
|
||||
"amateur radio satellite tracking. Provides motor control, signal "
|
||||
"measurement, firmware management, and orbital tracking via the "
|
||||
"Craft API."
|
||||
),
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
# Register tool modules
|
||||
from birdcage_mcp import prompts, resources # noqa: E402, F401
|
||||
from birdcage_mcp.tools import ( # noqa: E402
|
||||
connection,
|
||||
console,
|
||||
movement,
|
||||
satellite,
|
||||
signal,
|
||||
system,
|
||||
)
|
||||
|
||||
connection.register(mcp)
|
||||
movement.register(mcp)
|
||||
signal.register(mcp)
|
||||
system.register(mcp)
|
||||
satellite.register(mcp)
|
||||
console.register(mcp)
|
||||
resources.register(mcp)
|
||||
prompts.register(mcp)
|
||||
|
||||
|
||||
def main():
|
||||
print("birdcage-mcp v2026.02.17", file=sys.stderr)
|
||||
mcp.run()
|
||||
29
mcp/src/birdcage_mcp/state.py
Normal file
29
mcp/src/birdcage_mcp/state.py
Normal file
@ -0,0 +1,29 @@
|
||||
"""Session state for the Birdcage MCP server."""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass
|
||||
class BirdcageState:
|
||||
"""Mutable session state shared across all MCP tools.
|
||||
|
||||
Populated by the server lifespan and mutated by the ``connect``
|
||||
and ``disconnect`` tools. Device and craft_client are typed as
|
||||
``object`` to accept both real (SerialBridge/CraftClient) and
|
||||
demo (DemoDevice/DemoCraftClient) implementations without
|
||||
importing them at definition time.
|
||||
"""
|
||||
|
||||
device: object = None
|
||||
craft_client: object = None
|
||||
demo_mode: bool = False
|
||||
serial_port: str = ""
|
||||
firmware_name: str = "g2"
|
||||
connected: bool = False
|
||||
_errors: list[str] = field(default_factory=list)
|
||||
|
||||
def record_error(self, msg: str) -> None:
|
||||
"""Append an error message (capped at 50 entries)."""
|
||||
self._errors.append(msg)
|
||||
if len(self._errors) > 50:
|
||||
self._errors = self._errors[-50:]
|
||||
1
mcp/src/birdcage_mcp/tools/__init__.py
Normal file
1
mcp/src/birdcage_mcp/tools/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
"""Birdcage MCP tool modules."""
|
||||
112
mcp/src/birdcage_mcp/tools/connection.py
Normal file
112
mcp/src/birdcage_mcp/tools/connection.py
Normal file
@ -0,0 +1,112 @@
|
||||
"""Connection management tools — connect, disconnect, status."""
|
||||
|
||||
from fastmcp import Context
|
||||
|
||||
from birdcage_mcp.state import BirdcageState
|
||||
|
||||
|
||||
def register(mcp):
|
||||
"""Register connection tools on the FastMCP instance."""
|
||||
|
||||
@mcp.tool()
|
||||
async def connect(
|
||||
ctx: Context,
|
||||
port: str = "",
|
||||
firmware: str = "",
|
||||
skip_init: bool = False,
|
||||
) -> dict:
|
||||
"""Connect to a Winegard satellite dish via RS-422/RS-485 serial.
|
||||
|
||||
In demo mode this is a no-op (already connected). In hardware mode,
|
||||
opens the serial port and optionally runs the firmware initialization
|
||||
sequence (kills TV satellite search, enters motor submenu).
|
||||
|
||||
Args:
|
||||
port: Serial port path (e.g. /dev/ttyUSB0). Defaults to
|
||||
BIRDCAGE_PORT env var.
|
||||
firmware: Firmware variant: "g2", "hal205", or "hal000".
|
||||
Defaults to BIRDCAGE_FIRMWARE env var.
|
||||
skip_init: If True, skip firmware init (useful for
|
||||
reconnecting to a running dish).
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
|
||||
if state.demo_mode:
|
||||
return {
|
||||
"status": "connected",
|
||||
"mode": "demo",
|
||||
"message": "Demo device already connected",
|
||||
}
|
||||
|
||||
if state.connected and state.device is not None:
|
||||
return {
|
||||
"status": "already_connected",
|
||||
"port": state.serial_port,
|
||||
}
|
||||
|
||||
serial_port = port or state.serial_port
|
||||
fw = firmware or state.firmware_name
|
||||
|
||||
from birdcage.bridge import SerialBridge
|
||||
from birdcage.protocol import get_protocol
|
||||
|
||||
protocol = get_protocol(fw)
|
||||
device = SerialBridge(protocol)
|
||||
device.connect(serial_port)
|
||||
|
||||
if not skip_init:
|
||||
device.initialize()
|
||||
|
||||
state.device = device
|
||||
state.serial_port = serial_port
|
||||
state.firmware_name = fw
|
||||
state.connected = True
|
||||
|
||||
return {
|
||||
"status": "connected",
|
||||
"port": serial_port,
|
||||
"firmware": fw,
|
||||
"initialized": not skip_init,
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def disconnect(ctx: Context) -> dict:
|
||||
"""Disconnect from the satellite dish and release the serial port.
|
||||
|
||||
In demo mode this is a no-op. In hardware mode, returns to the
|
||||
root menu and closes the serial connection cleanly.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
|
||||
if state.demo_mode:
|
||||
return {"status": "demo", "message": "Demo mode — nothing to disconnect"}
|
||||
|
||||
if state.device is None or not state.connected:
|
||||
return {"status": "not_connected"}
|
||||
|
||||
state.device.disconnect()
|
||||
state.device = None
|
||||
state.connected = False
|
||||
|
||||
return {"status": "disconnected"}
|
||||
|
||||
@mcp.tool()
|
||||
async def status(ctx: Context) -> dict:
|
||||
"""Get current connection status and configuration.
|
||||
|
||||
Returns whether the dish is connected, the serial port, firmware
|
||||
variant, demo mode flag, and current menu state (if connected).
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
|
||||
result = {
|
||||
"connected": state.connected,
|
||||
"demo_mode": state.demo_mode,
|
||||
"serial_port": state.serial_port,
|
||||
"firmware": state.firmware_name,
|
||||
}
|
||||
|
||||
if state.device is not None and hasattr(state.device, "current_menu"):
|
||||
result["current_menu"] = state.device.current_menu
|
||||
|
||||
return result
|
||||
49
mcp/src/birdcage_mcp/tools/console.py
Normal file
49
mcp/src/birdcage_mcp/tools/console.py
Normal file
@ -0,0 +1,49 @@
|
||||
"""Raw console tool — safety-gated direct firmware access."""
|
||||
|
||||
from fastmcp import Context
|
||||
|
||||
from birdcage_mcp.state import BirdcageState
|
||||
|
||||
# Commands that are dangerous at the root menu level.
|
||||
_BLOCKED_ROOT_COMMANDS = {"q"}
|
||||
|
||||
|
||||
def register(mcp):
|
||||
"""Register console tools on the FastMCP instance."""
|
||||
|
||||
@mcp.tool()
|
||||
async def send_raw_command(ctx: Context, command: str) -> dict:
|
||||
"""Send a raw command to the firmware console and return the response.
|
||||
|
||||
This provides direct access to the Winegard firmware shell. The
|
||||
command is sent as-is and the raw response is returned.
|
||||
|
||||
SAFETY: The 'q' command at root menu is BLOCKED — it kills the
|
||||
UART shell and requires a hardware power cycle to recover.
|
||||
|
||||
HAZARDOUS COMMANDS (use with caution):
|
||||
- ADC 'scan' without arguments on uncalibrated AZ: deadlocks shell
|
||||
- NVS 'e <idx> <value>': writes to non-volatile storage
|
||||
- 'reboot': reboots the microcontroller
|
||||
- 'stow': folds dish flat (modified feeds may not survive)
|
||||
|
||||
After a raw command, the menu state is marked as unknown.
|
||||
|
||||
Args:
|
||||
command: The firmware command string to send.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
if state.device is None:
|
||||
raise RuntimeError("No device connected. Use the 'connect' tool first.")
|
||||
|
||||
cmd_stripped = command.strip().lower()
|
||||
if cmd_stripped in _BLOCKED_ROOT_COMMANDS:
|
||||
return {
|
||||
"error": f"Command '{command}' is blocked for safety. "
|
||||
"The 'q' command kills the UART shell and requires "
|
||||
"a hardware power cycle to recover.",
|
||||
"blocked": True,
|
||||
}
|
||||
|
||||
raw = state.device.send_raw(command)
|
||||
return {"response": raw}
|
||||
154
mcp/src/birdcage_mcp/tools/movement.py
Normal file
154
mcp/src/birdcage_mcp/tools/movement.py
Normal file
@ -0,0 +1,154 @@
|
||||
"""Movement tools — motor positioning, homing, and engagement."""
|
||||
|
||||
from fastmcp import Context
|
||||
|
||||
from birdcage_mcp.state import BirdcageState
|
||||
|
||||
|
||||
def _require_device(state: BirdcageState):
|
||||
"""Raise if no device is connected."""
|
||||
if state.device is None:
|
||||
raise RuntimeError(
|
||||
"No device connected. Use the 'connect' tool first."
|
||||
)
|
||||
|
||||
|
||||
def register(mcp):
|
||||
"""Register movement tools on the FastMCP instance."""
|
||||
|
||||
@mcp.tool()
|
||||
async def get_position(ctx: Context) -> dict:
|
||||
"""Query the current azimuth and elevation of the dish.
|
||||
|
||||
Returns {"azimuth": float, "elevation": float} in degrees.
|
||||
Azimuth range is 0-455 (before cable wrap). Elevation range
|
||||
depends on firmware variant (typically 18-65 deg for Carryout G2).
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
return state.device.get_position()
|
||||
|
||||
@mcp.tool()
|
||||
async def move_to(
|
||||
ctx: Context,
|
||||
azimuth: float,
|
||||
elevation: float,
|
||||
) -> dict:
|
||||
"""Move the dish to an absolute azimuth and elevation.
|
||||
|
||||
Both motors are commanded sequentially (AZ then EL). The firmware
|
||||
queues commands, so this returns immediately — the dish moves in
|
||||
the background. Poll get_position to track progress.
|
||||
|
||||
Args:
|
||||
azimuth: Target azimuth in degrees (0-455).
|
||||
elevation: Target elevation in degrees (18-65 for Carryout G2).
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
state.device.move_to(azimuth, elevation)
|
||||
return {
|
||||
"status": "moving",
|
||||
"target_azimuth": azimuth,
|
||||
"target_elevation": elevation,
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def move_motor(
|
||||
ctx: Context,
|
||||
motor_id: int,
|
||||
degrees: float,
|
||||
) -> dict:
|
||||
"""Move a single motor to an absolute position.
|
||||
|
||||
Args:
|
||||
motor_id: 0 for azimuth, 1 for elevation.
|
||||
degrees: Target angle in degrees.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
if motor_id not in (0, 1):
|
||||
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
|
||||
state.device.move_motor(motor_id, degrees)
|
||||
axis = "azimuth" if motor_id == 0 else "elevation"
|
||||
return {"status": "moving", "axis": axis, "target": degrees}
|
||||
|
||||
@mcp.tool()
|
||||
async def home_motor(ctx: Context, motor_id: int) -> dict:
|
||||
"""Home a motor to its reference position using stall detection.
|
||||
|
||||
The dish uses motor stalling (not limit switches) to find the
|
||||
mechanical home position. Expect audible grinding during homing.
|
||||
|
||||
Args:
|
||||
motor_id: 0 for azimuth, 1 for elevation.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
if motor_id not in (0, 1):
|
||||
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
|
||||
state.device.home_motor(motor_id)
|
||||
axis = "azimuth" if motor_id == 0 else "elevation"
|
||||
return {"status": "homing", "axis": axis}
|
||||
|
||||
@mcp.tool()
|
||||
async def engage_motors(ctx: Context) -> dict:
|
||||
"""Energize the stepper motors (apply holding torque).
|
||||
|
||||
Motors must be engaged before movement commands will work.
|
||||
They draw power while engaged even when stationary.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
state.device.engage()
|
||||
return {"status": "engaged"}
|
||||
|
||||
@mcp.tool()
|
||||
async def release_motors(ctx: Context) -> dict:
|
||||
"""De-energize the stepper motors (remove holding torque).
|
||||
|
||||
The dish will be free to move under wind/gravity load.
|
||||
Use before manual repositioning or to save power.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
state.device.release()
|
||||
return {"status": "released"}
|
||||
|
||||
@mcp.tool()
|
||||
async def stow(ctx: Context) -> dict:
|
||||
"""Move the dish to stow position (AZ=0, EL=65).
|
||||
|
||||
Stow position is safe for transport with the standard feed horn.
|
||||
WARNING: Modified feeds may not survive the stow movement.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
state.device.move_to(0.0, 65.0)
|
||||
return {
|
||||
"status": "stowing",
|
||||
"target_azimuth": 0.0,
|
||||
"target_elevation": 65.0,
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_step_positions(ctx: Context) -> dict:
|
||||
"""Read raw stepper motor positions in step counts.
|
||||
|
||||
Returns {"az_steps": int, "el_steps": int}. The Carryout G2 has
|
||||
40000 steps/rev for AZ and 24960 steps/rev for EL.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
return state.device.get_step_positions()
|
||||
|
||||
@mcp.tool()
|
||||
async def get_el_limits(ctx: Context) -> dict:
|
||||
"""Read the firmware elevation angle limits.
|
||||
|
||||
Returns {"min": float, "max": float, "home": float} in degrees.
|
||||
The Carryout G2 defaults are min=18, max=65, home=65.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
return state.device.get_el_limits()
|
||||
102
mcp/src/birdcage_mcp/tools/satellite.py
Normal file
102
mcp/src/birdcage_mcp/tools/satellite.py
Normal file
@ -0,0 +1,102 @@
|
||||
"""Satellite tracking tools — search, passes, visible targets."""
|
||||
|
||||
from dataclasses import asdict
|
||||
|
||||
from fastmcp import Context
|
||||
|
||||
from birdcage_mcp.state import BirdcageState
|
||||
|
||||
|
||||
def _require_craft(state: BirdcageState):
|
||||
if state.craft_client is None:
|
||||
raise RuntimeError("No Craft API client configured.")
|
||||
|
||||
|
||||
def register(mcp):
|
||||
"""Register satellite tools on the FastMCP instance."""
|
||||
|
||||
@mcp.tool()
|
||||
async def search_satellites(
|
||||
ctx: Context,
|
||||
query: str,
|
||||
limit: int = 20,
|
||||
) -> dict:
|
||||
"""Search the orbital object catalog for satellites, planets, and stars.
|
||||
|
||||
Uses the Craft API (space.warehack.ing) to find objects by name.
|
||||
Returns matches with name, type, NORAD ID (for satellites), and
|
||||
current altitude if above horizon.
|
||||
|
||||
Args:
|
||||
query: Search term (e.g. "ISS", "NOAA", "Moon").
|
||||
limit: Maximum results to return.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_craft(state)
|
||||
results = state.craft_client.search(query, limit)
|
||||
return {
|
||||
"results": [asdict(r) for r in results],
|
||||
"count": len(results),
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_passes(
|
||||
ctx: Context,
|
||||
norad_id: int,
|
||||
hours: int = 24,
|
||||
) -> dict:
|
||||
"""Get upcoming pass predictions for a satellite.
|
||||
|
||||
Returns AOS/TCA/LOS times, azimuths, max elevation, duration,
|
||||
and visibility for each predicted pass.
|
||||
|
||||
Args:
|
||||
norad_id: NORAD catalog number (e.g. 25544 for ISS).
|
||||
hours: How many hours ahead to predict (default 24).
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_craft(state)
|
||||
passes = state.craft_client.get_passes(norad_id, hours)
|
||||
return {
|
||||
"passes": [asdict(p) for p in passes],
|
||||
"count": len(passes),
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_next_pass(ctx: Context, norad_id: int) -> dict:
|
||||
"""Get the next upcoming pass for a satellite.
|
||||
|
||||
Returns the single soonest pass prediction, or null if none
|
||||
are predicted in the near future.
|
||||
|
||||
Args:
|
||||
norad_id: NORAD catalog number (e.g. 25544 for ISS).
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_craft(state)
|
||||
p = state.craft_client.get_next_pass(norad_id)
|
||||
if p is None:
|
||||
return {"pass": None}
|
||||
return {"pass": asdict(p)}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_visible_targets(
|
||||
ctx: Context,
|
||||
min_alt: float = 0.0,
|
||||
) -> dict:
|
||||
"""Get all objects currently above the horizon with real-time positions.
|
||||
|
||||
Returns satellites, planets, and stars with AZ/EL computed
|
||||
server-side. This is the primary tracking endpoint — use it
|
||||
to find what's overhead right now.
|
||||
|
||||
Args:
|
||||
min_alt: Minimum altitude in degrees (default 0 = horizon).
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_craft(state)
|
||||
targets = state.craft_client.get_visible_targets(min_alt)
|
||||
return {
|
||||
"targets": [asdict(t) for t in targets],
|
||||
"count": len(targets),
|
||||
}
|
||||
150
mcp/src/birdcage_mcp/tools/signal.py
Normal file
150
mcp/src/birdcage_mcp/tools/signal.py
Normal file
@ -0,0 +1,150 @@
|
||||
"""Signal measurement tools — RSSI, DVB config, LNA, and AZ sweep."""
|
||||
|
||||
from fastmcp import Context
|
||||
|
||||
from birdcage_mcp.state import BirdcageState
|
||||
|
||||
|
||||
def _require_device(state: BirdcageState):
|
||||
if state.device is None:
|
||||
raise RuntimeError("No device connected. Use the 'connect' tool first.")
|
||||
|
||||
|
||||
def register(mcp):
|
||||
"""Register signal tools on the FastMCP instance."""
|
||||
|
||||
@mcp.tool()
|
||||
async def get_rssi(ctx: Context, iterations: int = 10) -> dict:
|
||||
"""Read averaged RSSI signal strength from the BCM4515 DVB tuner.
|
||||
|
||||
Returns {"reads": int, "average": int, "current": int}.
|
||||
Noise floor is approximately 500 ADC counts.
|
||||
|
||||
Args:
|
||||
iterations: Number of samples to average (default 10).
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
return state.device.get_rssi(iterations)
|
||||
|
||||
@mcp.tool()
|
||||
async def get_adc_rssi(ctx: Context) -> dict:
|
||||
"""Read single-shot ADC RSSI value (raw, unbounded).
|
||||
|
||||
Returns the raw ADC count from the hardware ADC subsystem,
|
||||
bypassing the DVB tuner's averaging. Useful for quick signal checks.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.get_adc_rssi()
|
||||
return {"raw": raw}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_lock_status(ctx: Context) -> dict:
|
||||
"""Check DVB signal lock status (single-shot).
|
||||
|
||||
Returns the raw firmware response string containing lock state,
|
||||
RSSI, and glitch count.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.get_lock_status()
|
||||
return {"raw": raw}
|
||||
|
||||
@mcp.tool()
|
||||
async def enable_lna(ctx: Context) -> dict:
|
||||
"""Enable the LNA by setting LNB voltage to 13V (V-pol).
|
||||
|
||||
Shortcut for set_lnb_voltage(mode='odu'). Use set_lnb_voltage
|
||||
to switch back to 18V (H-pol) when done.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
state.device.enable_lna()
|
||||
return {"status": "lna_enabled", "voltage": "13V", "polarity": "V"}
|
||||
|
||||
@mcp.tool()
|
||||
async def set_lnb_voltage(ctx: Context, mode: str) -> dict:
|
||||
"""Set LNB DC voltage and polarization.
|
||||
|
||||
Controls the LNB bias voltage on the coax feed line. This
|
||||
determines both LNA power state and receive polarization:
|
||||
|
||||
- 'odu' = 13V (V-pol, LNA enabled) — for radio astronomy / ham
|
||||
- 'stb' = 18V (H-pol, boot default) — standard consumer mode
|
||||
|
||||
V-pol and H-pol see different transponders. The PEAK submenu's
|
||||
'rssits' command alternates between them for comparison.
|
||||
|
||||
Args:
|
||||
mode: 'odu' for 13V/V-pol or 'stb' for 18V/H-pol.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.set_lnb_voltage(mode)
|
||||
voltage = "13V" if mode.lower() == "odu" else "18V"
|
||||
polarity = "V" if mode.lower() == "odu" else "H"
|
||||
return {
|
||||
"status": "set",
|
||||
"mode": mode.lower(),
|
||||
"voltage": voltage,
|
||||
"polarity": polarity,
|
||||
"raw": raw,
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_dvb_config(ctx: Context) -> dict:
|
||||
"""Read BCM4515 DVB tuner hardware and firmware version.
|
||||
|
||||
Returns chip ID, revision, firmware version, and strap config.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.get_dvb_config()
|
||||
return {"raw": raw}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_channel_params(ctx: Context) -> dict:
|
||||
"""Read current DVB channel parameters.
|
||||
|
||||
Returns frequency, symbol rate, modulation, LNB polarity,
|
||||
tone settings, and scan configuration.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.get_channel_params()
|
||||
return {"raw": raw}
|
||||
|
||||
@mcp.tool()
|
||||
async def az_sweep(
|
||||
ctx: Context,
|
||||
start_az: float,
|
||||
span: float,
|
||||
step_cdeg: int = 100,
|
||||
num_xponders: int = 1,
|
||||
timeout: float = 120.0,
|
||||
) -> dict:
|
||||
"""Execute a firmware-accelerated azimuth sweep (azscanwxp).
|
||||
|
||||
Moves to start_az, then sweeps span degrees while measuring
|
||||
RSSI/Lock/SNR at each step. The firmware handles motor movement
|
||||
and measurement atomically — no per-point serial round-trips.
|
||||
|
||||
Returns a list of measurement points, each containing:
|
||||
az (degrees), rssi (ADC counts), lock (0/1), snr (dB).
|
||||
|
||||
SAFETY: Requires homed motors. Do NOT run on uncalibrated axes.
|
||||
|
||||
Args:
|
||||
start_az: Starting azimuth in degrees.
|
||||
span: Total sweep width in degrees.
|
||||
step_cdeg: Step size in centidegrees (100 = 1.00 deg).
|
||||
num_xponders: Transponders to cycle per position.
|
||||
timeout: Max wait time for the sweep to complete.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
points = state.device.az_sweep_firmware(
|
||||
start_az, span, step_cdeg, num_xponders, timeout
|
||||
)
|
||||
return {"points": points, "count": len(points)}
|
||||
178
mcp/src/birdcage_mcp/tools/system.py
Normal file
178
mcp/src/birdcage_mcp/tools/system.py
Normal file
@ -0,0 +1,178 @@
|
||||
"""System and firmware tools — identification, motor dynamics, A3981, NVS."""
|
||||
|
||||
from fastmcp import Context
|
||||
|
||||
from birdcage_mcp.state import BirdcageState
|
||||
|
||||
|
||||
def _require_device(state: BirdcageState):
|
||||
if state.device is None:
|
||||
raise RuntimeError("No device connected. Use the 'connect' tool first.")
|
||||
|
||||
|
||||
def register(mcp):
|
||||
"""Register system tools on the FastMCP instance."""
|
||||
|
||||
@mcp.tool()
|
||||
async def get_firmware_id(ctx: Context) -> dict:
|
||||
"""Read full MCU and firmware identification.
|
||||
|
||||
Returns NVS version, system ID, silicon info, board ID, antenna
|
||||
ID, software version, clock frequencies, and flash layout.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.get_firmware_id()
|
||||
return {"raw": raw}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_motor_dynamics(ctx: Context) -> dict:
|
||||
"""Read max velocity and acceleration for both motor axes.
|
||||
|
||||
Returns {"az_max_vel", "el_max_vel", "az_accel", "el_accel"}
|
||||
in degrees/sec and degrees/sec^2 respectively.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
return state.device.get_motor_dynamics()
|
||||
|
||||
@mcp.tool()
|
||||
async def set_max_velocity(
|
||||
ctx: Context,
|
||||
motor_id: int,
|
||||
deg_per_sec: float,
|
||||
) -> dict:
|
||||
"""Set the maximum velocity for a motor axis.
|
||||
|
||||
Args:
|
||||
motor_id: 0 for azimuth, 1 for elevation.
|
||||
deg_per_sec: Maximum velocity in degrees per second.
|
||||
Carryout G2 defaults: AZ=65, EL=45.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
if motor_id not in (0, 1):
|
||||
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
|
||||
state.device.set_max_velocity(motor_id, deg_per_sec)
|
||||
axis = "azimuth" if motor_id == 0 else "elevation"
|
||||
return {"status": "set", "axis": axis, "max_velocity": deg_per_sec}
|
||||
|
||||
@mcp.tool()
|
||||
async def set_max_acceleration(
|
||||
ctx: Context,
|
||||
motor_id: int,
|
||||
accel: float,
|
||||
) -> dict:
|
||||
"""Set the maximum acceleration for a motor axis.
|
||||
|
||||
Args:
|
||||
motor_id: 0 for azimuth, 1 for elevation.
|
||||
accel: Maximum acceleration in degrees/sec^2.
|
||||
Carryout G2 default: 400 for both axes.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
if motor_id not in (0, 1):
|
||||
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
|
||||
state.device.set_max_acceleration(motor_id, accel)
|
||||
axis = "azimuth" if motor_id == 0 else "elevation"
|
||||
return {"status": "set", "axis": axis, "max_acceleration": accel}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_motor_life(ctx: Context) -> dict:
|
||||
"""Read motor lifetime and usage statistics.
|
||||
|
||||
Returns total moves, total degrees traveled, and uptime hours
|
||||
for both AZ and EL axes.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.get_motor_life()
|
||||
return {"raw": raw}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_pid_gains(ctx: Context) -> dict:
|
||||
"""Read PID controller gains for both motor axes.
|
||||
|
||||
Returns {"az": {"kp", "kv", "ki"}, "el": {"kp", "kv", "ki"}}.
|
||||
Carryout G2 defaults: AZ Kp=600/Kv=60/Ki=1, EL Kp=250/Kv=50/Ki=1.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
return state.device.get_pid_gains()
|
||||
|
||||
@mcp.tool()
|
||||
async def set_pid_gains(
|
||||
ctx: Context,
|
||||
motor_id: int,
|
||||
kp: float,
|
||||
kv: float,
|
||||
ki: float,
|
||||
) -> dict:
|
||||
"""Set PID controller gains for a motor axis.
|
||||
|
||||
CAUTION: Incorrect PID values can cause oscillation or motor damage.
|
||||
|
||||
Args:
|
||||
motor_id: 0 for azimuth, 1 for elevation.
|
||||
kp: Proportional gain.
|
||||
kv: Velocity (derivative) gain.
|
||||
ki: Integral gain.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
if motor_id not in (0, 1):
|
||||
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
|
||||
state.device.set_pid_gains(motor_id, kp, kv, ki)
|
||||
axis = "azimuth" if motor_id == 0 else "elevation"
|
||||
return {"status": "set", "axis": axis, "kp": kp, "kv": kv, "ki": ki}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_a3981_diag(ctx: Context) -> dict:
|
||||
"""Read Allegro A3981 stepper driver diagnostic status.
|
||||
|
||||
Returns fault status for both AZ and EL motor drivers.
|
||||
Normal response is "AZ DIAG: OK / EL DIAG: OK".
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.get_a3981_diag()
|
||||
return {"raw": raw}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_a3981_modes(ctx: Context) -> dict:
|
||||
"""Read A3981 stepper driver operating modes.
|
||||
|
||||
Returns step mode (AUTO/fixed), current mode (AUTO/HiZ/LoZ),
|
||||
and step size (FULL=16 through SIXTEENTH=1) for both drivers.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
return state.device.get_a3981_modes()
|
||||
|
||||
@mcp.tool()
|
||||
async def nvs_dump(ctx: Context) -> dict:
|
||||
"""Dump all non-volatile storage (NVS) values.
|
||||
|
||||
Returns the complete NVS table with index, name, current value,
|
||||
saved value, and default value for every setting.
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.nvs_dump()
|
||||
return {"raw": raw}
|
||||
|
||||
@mcp.tool()
|
||||
async def nvs_read(ctx: Context, index: int) -> dict:
|
||||
"""Read a single NVS value by index.
|
||||
|
||||
Key indices: 20=Disable Tracker, 80=AZ Max Vel, 85=EL Max Vel,
|
||||
101=Min EL, 102=Max EL, 128-133=PID gains.
|
||||
|
||||
Args:
|
||||
index: NVS index number (0-143).
|
||||
"""
|
||||
state: BirdcageState = ctx.request_context.lifespan_context
|
||||
_require_device(state)
|
||||
raw = state.device.nvs_read(index)
|
||||
return {"index": index, "raw": raw}
|
||||
74
mcp/tests/conftest.py
Normal file
74
mcp/tests/conftest.py
Normal file
@ -0,0 +1,74 @@
|
||||
"""Shared fixtures for birdcage-mcp tests.
|
||||
|
||||
Uses FastMCP's run_server_async to spin up a real MCP server backed by
|
||||
DemoDevice + DemoCraftClient. No serial hardware, no subprocesses.
|
||||
"""
|
||||
|
||||
import json
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
import pytest
|
||||
from birdcage.demo import DemoCraftClient, DemoDevice
|
||||
from fastmcp import Client, FastMCP
|
||||
from fastmcp.client.transports import StreamableHttpTransport
|
||||
from fastmcp.utilities.tests import run_server_async
|
||||
|
||||
from birdcage_mcp.state import BirdcageState
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _test_lifespan(server: FastMCP):
|
||||
"""Test lifespan that pre-connects a DemoDevice."""
|
||||
device = DemoDevice()
|
||||
device.connect()
|
||||
device.initialize()
|
||||
|
||||
state = BirdcageState(
|
||||
device=device,
|
||||
craft_client=DemoCraftClient(),
|
||||
demo_mode=True,
|
||||
serial_port="/dev/demo",
|
||||
firmware_name="g2",
|
||||
connected=True,
|
||||
)
|
||||
yield state
|
||||
|
||||
|
||||
def _build_server() -> FastMCP:
|
||||
"""Build a FastMCP server with all tools registered."""
|
||||
from birdcage_mcp import prompts, resources
|
||||
from birdcage_mcp.tools import (
|
||||
connection,
|
||||
console,
|
||||
movement,
|
||||
satellite,
|
||||
signal,
|
||||
system,
|
||||
)
|
||||
|
||||
mcp = FastMCP("birdcage-test", lifespan=_test_lifespan)
|
||||
connection.register(mcp)
|
||||
movement.register(mcp)
|
||||
signal.register(mcp)
|
||||
system.register(mcp)
|
||||
satellite.register(mcp)
|
||||
console.register(mcp)
|
||||
resources.register(mcp)
|
||||
prompts.register(mcp)
|
||||
return mcp
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def mcp_client():
|
||||
"""Yield a connected MCP client backed by DemoDevice."""
|
||||
server = _build_server()
|
||||
async with (
|
||||
run_server_async(server) as url,
|
||||
Client(StreamableHttpTransport(url)) as client,
|
||||
):
|
||||
yield client
|
||||
|
||||
|
||||
def parse_result(result) -> dict:
|
||||
"""Extract the dict from a tool call result."""
|
||||
return json.loads(result.content[0].text)
|
||||
29
mcp/tests/test_connection.py
Normal file
29
mcp/tests/test_connection.py
Normal file
@ -0,0 +1,29 @@
|
||||
"""Tests for connection tools (connect, disconnect, status)."""
|
||||
|
||||
import pytest
|
||||
from conftest import parse_result
|
||||
from fastmcp import Client
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_status_shows_connected(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("status", {})
|
||||
data = parse_result(result)
|
||||
assert data["connected"] is True
|
||||
assert data["demo_mode"] is True
|
||||
assert data["firmware"] == "g2"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_connect_demo_noop(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("connect", {})
|
||||
data = parse_result(result)
|
||||
assert data["mode"] == "demo"
|
||||
assert data["status"] == "connected"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_disconnect_demo_noop(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("disconnect", {})
|
||||
data = parse_result(result)
|
||||
assert data["status"] == "demo"
|
||||
56
mcp/tests/test_console.py
Normal file
56
mcp/tests/test_console.py
Normal file
@ -0,0 +1,56 @@
|
||||
"""Tests for raw console tool (safety-gated)."""
|
||||
|
||||
import pytest
|
||||
from conftest import parse_result
|
||||
from fastmcp import Client
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_send_raw_command(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"send_raw_command", {"command": "?"}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert "response" in data
|
||||
assert "Available commands" in data["response"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_q_command_blocked(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"send_raw_command", {"command": "q"}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["blocked"] is True
|
||||
assert "power cycle" in data["error"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_q_command_blocked_with_whitespace(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"send_raw_command", {"command": " Q "}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["blocked"] is True
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_submenu_navigation(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"send_raw_command", {"command": "mot"}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert "MOT>" in data["response"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_raw_motor_query(mcp_client: Client):
|
||||
# Enter mot submenu first
|
||||
await mcp_client.call_tool("send_raw_command", {"command": "mot"})
|
||||
# Query position
|
||||
result = await mcp_client.call_tool(
|
||||
"send_raw_command", {"command": "a"}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert "Angle[0]" in data["response"]
|
||||
assert "Angle[1]" in data["response"]
|
||||
100
mcp/tests/test_movement.py
Normal file
100
mcp/tests/test_movement.py
Normal file
@ -0,0 +1,100 @@
|
||||
"""Tests for movement tools."""
|
||||
|
||||
import pytest
|
||||
from conftest import parse_result
|
||||
from fastmcp import Client
|
||||
from fastmcp.exceptions import ToolError
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_position(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_position", {})
|
||||
data = parse_result(result)
|
||||
assert "azimuth" in data
|
||||
assert "elevation" in data
|
||||
assert isinstance(data["azimuth"], float)
|
||||
assert isinstance(data["elevation"], float)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_move_to(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"move_to", {"azimuth": 200.0, "elevation": 40.0}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["status"] == "moving"
|
||||
assert data["target_azimuth"] == 200.0
|
||||
assert data["target_elevation"] == 40.0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_move_motor_az(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"move_motor", {"motor_id": 0, "degrees": 90.0}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["axis"] == "azimuth"
|
||||
assert data["target"] == 90.0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_move_motor_el(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"move_motor", {"motor_id": 1, "degrees": 30.0}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["axis"] == "elevation"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_move_motor_invalid_id(mcp_client: Client):
|
||||
with pytest.raises(ToolError):
|
||||
await mcp_client.call_tool(
|
||||
"move_motor", {"motor_id": 2, "degrees": 10.0}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_home_motor(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("home_motor", {"motor_id": 1})
|
||||
data = parse_result(result)
|
||||
assert data["status"] == "homing"
|
||||
assert data["axis"] == "elevation"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_engage_release(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("release_motors", {})
|
||||
data = parse_result(result)
|
||||
assert data["status"] == "released"
|
||||
|
||||
result = await mcp_client.call_tool("engage_motors", {})
|
||||
data = parse_result(result)
|
||||
assert data["status"] == "engaged"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_stow(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("stow", {})
|
||||
data = parse_result(result)
|
||||
assert data["status"] == "stowing"
|
||||
assert data["target_azimuth"] == 0.0
|
||||
assert data["target_elevation"] == 65.0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_step_positions(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_step_positions", {})
|
||||
data = parse_result(result)
|
||||
assert "az_steps" in data
|
||||
assert "el_steps" in data
|
||||
assert isinstance(data["az_steps"], int)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_el_limits(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_el_limits", {})
|
||||
data = parse_result(result)
|
||||
assert data["min"] == 18.0
|
||||
assert data["max"] == 65.0
|
||||
assert data["home"] == 65.0
|
||||
86
mcp/tests/test_satellite.py
Normal file
86
mcp/tests/test_satellite.py
Normal file
@ -0,0 +1,86 @@
|
||||
"""Tests for satellite tracking tools."""
|
||||
|
||||
import pytest
|
||||
from conftest import parse_result
|
||||
from fastmcp import Client
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_search_satellites(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"search_satellites", {"query": "ISS"}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["count"] >= 1
|
||||
names = [r["name"] for r in data["results"]]
|
||||
assert any("ISS" in n for n in names)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_search_no_results(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"search_satellites", {"query": "nonexistent_xyz"}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["count"] == 0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_search_with_limit(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"search_satellites", {"query": "", "limit": 3}
|
||||
)
|
||||
data = parse_result(result)
|
||||
# Empty query won't match any catalog entries
|
||||
assert data["count"] >= 0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_passes(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"get_passes", {"norad_id": 25544}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["count"] > 0
|
||||
p = data["passes"][0]
|
||||
assert "aos_time" in p
|
||||
assert "tca_time" in p
|
||||
assert "los_time" in p
|
||||
assert "max_elevation" in p
|
||||
assert p["norad_id"] == 25544
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_next_pass(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"get_next_pass", {"norad_id": 25544}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["pass"] is not None
|
||||
assert data["pass"]["satellite_name"] == "ISS (ZARYA)"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_visible_targets(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"get_visible_targets", {}
|
||||
)
|
||||
data = parse_result(result)
|
||||
# Celestial bodies (Moon, Sun, Jupiter) are always visible
|
||||
assert data["count"] >= 1
|
||||
names = [t["name"] for t in data["targets"]]
|
||||
# At least celestial bodies should be present
|
||||
assert any(
|
||||
n in names for n in ("Moon", "Sun", "Jupiter")
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_visible_targets_with_min_alt(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"get_visible_targets", {"min_alt": 80.0}
|
||||
)
|
||||
data = parse_result(result)
|
||||
# Very high min_alt should filter out most targets
|
||||
for t in data["targets"]:
|
||||
assert t["altitude"] >= 80.0
|
||||
112
mcp/tests/test_signal.py
Normal file
112
mcp/tests/test_signal.py
Normal file
@ -0,0 +1,112 @@
|
||||
"""Tests for signal measurement tools."""
|
||||
|
||||
import pytest
|
||||
from conftest import parse_result
|
||||
from fastmcp import Client
|
||||
from fastmcp.exceptions import ToolError
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_rssi_default(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_rssi", {})
|
||||
data = parse_result(result)
|
||||
assert data["reads"] == 10
|
||||
assert isinstance(data["average"], int)
|
||||
assert isinstance(data["current"], int)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_rssi_custom_iterations(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_rssi", {"iterations": 5})
|
||||
data = parse_result(result)
|
||||
assert data["reads"] == 5
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_adc_rssi(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_adc_rssi", {})
|
||||
data = parse_result(result)
|
||||
assert "raw" in data
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_lock_status(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_lock_status", {})
|
||||
data = parse_result(result)
|
||||
assert "raw" in data
|
||||
assert "Lock:" in data["raw"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_enable_lna(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("enable_lna", {})
|
||||
data = parse_result(result)
|
||||
assert data["status"] == "lna_enabled"
|
||||
assert data["voltage"] == "13V"
|
||||
assert data["polarity"] == "V"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_set_lnb_voltage_odu(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"set_lnb_voltage", {"mode": "odu"}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["voltage"] == "13V"
|
||||
assert data["polarity"] == "V"
|
||||
assert data["mode"] == "odu"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_set_lnb_voltage_stb(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"set_lnb_voltage", {"mode": "stb"}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["voltage"] == "18V"
|
||||
assert data["polarity"] == "H"
|
||||
assert data["mode"] == "stb"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_set_lnb_voltage_invalid(mcp_client: Client):
|
||||
with pytest.raises(ToolError):
|
||||
await mcp_client.call_tool(
|
||||
"set_lnb_voltage", {"mode": "invalid"}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_dvb_config(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_dvb_config", {})
|
||||
data = parse_result(result)
|
||||
assert "BCM" in data["raw"]
|
||||
assert "0x4515" in data["raw"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_channel_params(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_channel_params", {})
|
||||
data = parse_result(result)
|
||||
assert "Frequency" in data["raw"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_az_sweep(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"az_sweep",
|
||||
{
|
||||
"start_az": 195.0,
|
||||
"span": 10.0,
|
||||
"step_cdeg": 200,
|
||||
"num_xponders": 1,
|
||||
},
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["count"] > 0
|
||||
assert len(data["points"]) == data["count"]
|
||||
pt = data["points"][0]
|
||||
assert "az" in pt
|
||||
assert "rssi" in pt
|
||||
assert "lock" in pt
|
||||
assert "snr" in pt
|
||||
121
mcp/tests/test_system.py
Normal file
121
mcp/tests/test_system.py
Normal file
@ -0,0 +1,121 @@
|
||||
"""Tests for system and firmware tools."""
|
||||
|
||||
import pytest
|
||||
from conftest import parse_result
|
||||
from fastmcp import Client
|
||||
from fastmcp.exceptions import ToolError
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_firmware_id(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_firmware_id", {})
|
||||
data = parse_result(result)
|
||||
assert "02.02.48" in data["raw"]
|
||||
assert "TWELINCH" in data["raw"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_motor_dynamics(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_motor_dynamics", {})
|
||||
data = parse_result(result)
|
||||
assert data["az_max_vel"] == 65.0
|
||||
assert data["el_max_vel"] == 45.0
|
||||
assert data["az_accel"] == 400.0
|
||||
assert data["el_accel"] == 400.0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_set_max_velocity(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"set_max_velocity", {"motor_id": 0, "deg_per_sec": 30.0}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["axis"] == "azimuth"
|
||||
assert data["max_velocity"] == 30.0
|
||||
|
||||
# Verify it stuck
|
||||
result = await mcp_client.call_tool("get_motor_dynamics", {})
|
||||
data = parse_result(result)
|
||||
assert data["az_max_vel"] == 30.0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_set_max_velocity_invalid_motor(mcp_client: Client):
|
||||
with pytest.raises(ToolError):
|
||||
await mcp_client.call_tool(
|
||||
"set_max_velocity", {"motor_id": 3, "deg_per_sec": 10.0}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_set_max_acceleration(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"set_max_acceleration", {"motor_id": 1, "accel": 200.0}
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["axis"] == "elevation"
|
||||
assert data["max_acceleration"] == 200.0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_motor_life(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_motor_life", {})
|
||||
data = parse_result(result)
|
||||
assert "AZ total moves" in data["raw"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_pid_gains(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_pid_gains", {})
|
||||
data = parse_result(result)
|
||||
assert data["az"]["kp"] == 600.0
|
||||
assert data["az"]["kv"] == 60.0
|
||||
assert data["el"]["kp"] == 250.0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_set_pid_gains(mcp_client: Client):
|
||||
result = await mcp_client.call_tool(
|
||||
"set_pid_gains",
|
||||
{"motor_id": 0, "kp": 500.0, "kv": 55.0, "ki": 2.0},
|
||||
)
|
||||
data = parse_result(result)
|
||||
assert data["status"] == "set"
|
||||
assert data["kp"] == 500.0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_a3981_diag(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_a3981_diag", {})
|
||||
data = parse_result(result)
|
||||
assert "OK" in data["raw"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_a3981_modes(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("get_a3981_modes", {})
|
||||
data = parse_result(result)
|
||||
assert "AUTO" in data["step_mode"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_nvs_dump(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("nvs_dump", {})
|
||||
data = parse_result(result)
|
||||
assert "Disable Tracker" in data["raw"]
|
||||
assert "AZ Max Vel" in data["raw"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_nvs_read(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("nvs_read", {"index": 20})
|
||||
data = parse_result(result)
|
||||
assert data["index"] == 20
|
||||
assert "Disable Tracker" in data["raw"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_nvs_read_missing_index(mcp_client: Client):
|
||||
result = await mcp_client.call_tool("nvs_read", {"index": 999})
|
||||
data = parse_result(result)
|
||||
assert "not found" in data["raw"]
|
||||
1678
mcp/uv.lock
generated
Normal file
1678
mcp/uv.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
@ -1,6 +1,15 @@
|
||||
"""birdcage: Winegard satellite dish control for amateur radio sky tracking."""
|
||||
|
||||
from birdcage.antenna import AntennaConfig, BirdcageAntenna
|
||||
from birdcage.bridge import SerialBridge
|
||||
from birdcage.craft_client import (
|
||||
CraftClient,
|
||||
CraftTrackingState,
|
||||
PassPrediction,
|
||||
SearchResult,
|
||||
TargetPosition,
|
||||
)
|
||||
from birdcage.demo import DemoCraftClient, DemoDevice
|
||||
from birdcage.leapfrog import apply_leapfrog
|
||||
from birdcage.protocol import (
|
||||
CarryoutG2Protocol,
|
||||
@ -16,11 +25,19 @@ __all__ = [
|
||||
"AntennaConfig",
|
||||
"BirdcageAntenna",
|
||||
"CarryoutG2Protocol",
|
||||
"CraftClient",
|
||||
"CraftTrackingState",
|
||||
"DemoCraftClient",
|
||||
"DemoDevice",
|
||||
"FirmwareProtocol",
|
||||
"HAL000Protocol",
|
||||
"HAL205Protocol",
|
||||
"PassPrediction",
|
||||
"Position",
|
||||
"RssiReading",
|
||||
"RotctldServer",
|
||||
"SearchResult",
|
||||
"SerialBridge",
|
||||
"TargetPosition",
|
||||
"apply_leapfrog",
|
||||
]
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
"""Thread-safe bridge between Birdcage TUI and CarryoutG2Protocol.
|
||||
"""Thread-safe bridge between consumers and CarryoutG2Protocol.
|
||||
|
||||
Wraps all serial I/O in a threading.Lock so the TUI's worker threads
|
||||
Wraps all serial I/O in a threading.Lock so concurrent callers
|
||||
don't stomp on each other. Tracks the current firmware submenu to
|
||||
minimize unnecessary q-then-reenter transitions.
|
||||
"""
|
||||
@ -53,7 +53,7 @@ _MENU_COMMANDS: dict[Menu, str] = {
|
||||
|
||||
|
||||
class SerialBridge:
|
||||
"""Thread-safe wrapper around CarryoutG2Protocol for TUI consumption.
|
||||
"""Thread-safe wrapper around CarryoutG2Protocol.
|
||||
|
||||
All public methods acquire a lock before touching the serial port.
|
||||
The bridge tracks the current firmware submenu so it can skip
|
||||
@ -68,7 +68,7 @@ class SerialBridge:
|
||||
self._connected = False
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Menu prompt → string mapping for status display
|
||||
# Menu prompt -> string mapping for status display
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
_MENU_PROMPTS: dict[Menu, str] = {
|
||||
@ -297,7 +297,7 @@ class SerialBridge:
|
||||
"el_accel": 0.0,
|
||||
}
|
||||
|
||||
# mv → "Max Vel [0] = 65.0 Max Vel [1] = 45.0" or similar
|
||||
# mv -> "Max Vel [0] = 65.0 Max Vel [1] = 45.0" or similar
|
||||
vel_matches = re.findall(r"Max Vel\s*\[(\d)\]\s*=\s*(-?\d+\.?\d*)", mv_resp)
|
||||
for motor_id, val in vel_matches:
|
||||
if motor_id == "0":
|
||||
@ -305,7 +305,7 @@ class SerialBridge:
|
||||
elif motor_id == "1":
|
||||
result["el_max_vel"] = float(val)
|
||||
|
||||
# ma → "Accel[0] = 400.0 Accel[1] = 400.0"
|
||||
# ma -> "Accel[0] = 400.0 Accel[1] = 400.0"
|
||||
acc_matches = re.findall(r"Accel\[(\d)\]\s*=\s*(-?\d+\.?\d*)", ma_resp)
|
||||
for motor_id, val in acc_matches:
|
||||
if motor_id == "0":
|
||||
@ -316,7 +316,7 @@ class SerialBridge:
|
||||
return result
|
||||
|
||||
def set_max_velocity(self, motor_id: int, deg_per_sec: float) -> None:
|
||||
"""Set max velocity for a motor axis (°/s). Firmware: MOT> mv [motor] [vel]."""
|
||||
"""Set max velocity for a motor axis (deg/s)."""
|
||||
with self._lock:
|
||||
self._ensure_menu(Menu.MOT)
|
||||
self._send(f"mv {motor_id} {deg_per_sec:.1f}")
|
||||
@ -324,7 +324,7 @@ class SerialBridge:
|
||||
def set_max_acceleration(self, motor_id: int, accel: float) -> None:
|
||||
"""Set max acceleration for a motor axis.
|
||||
|
||||
Firmware: MOT> ma [motor] [accel] (°/s²).
|
||||
Firmware: MOT> ma [motor] [accel] (deg/s^2).
|
||||
"""
|
||||
with self._lock:
|
||||
self._ensure_menu(Menu.MOT)
|
||||
@ -460,7 +460,7 @@ class SerialBridge:
|
||||
Args:
|
||||
start_az: Starting azimuth in degrees.
|
||||
span: Total sweep width in degrees.
|
||||
step_cdeg: Step size in centidegrees (100 = 1.00°).
|
||||
step_cdeg: Step size in centidegrees (100 = 1.00 deg).
|
||||
num_xponders: Number of transponders to cycle per position.
|
||||
timeout: Serial read timeout for the long-running command.
|
||||
|
||||
@ -526,10 +526,26 @@ class SerialBridge:
|
||||
raise ValueError(f"Could not parse RSSI: {response!r}")
|
||||
|
||||
def enable_lna(self) -> None:
|
||||
"""Enable LNA in ODU mode (sets LNB to 13V)."""
|
||||
"""Enable LNA in ODU mode (13V). Alias for set_lnb_voltage('odu')."""
|
||||
self.set_lnb_voltage("odu")
|
||||
|
||||
def set_lnb_voltage(self, mode: str) -> str:
|
||||
"""Set LNB DC voltage mode.
|
||||
|
||||
Args:
|
||||
mode: 'odu' for 13V (V-pol, LNA enabled) or 'stb' for 18V (H-pol).
|
||||
|
||||
Returns:
|
||||
Raw firmware response.
|
||||
"""
|
||||
mode = mode.strip().lower()
|
||||
if mode not in ("odu", "stb"):
|
||||
raise ValueError(
|
||||
f"Invalid LNB mode {mode!r}: use 'odu' (13V) or 'stb' (18V)"
|
||||
)
|
||||
with self._lock:
|
||||
self._ensure_menu(Menu.DVB)
|
||||
self._send("lnbdc odu")
|
||||
return self._send(f"lnbdc {mode}")
|
||||
|
||||
def get_lock_status(self) -> str:
|
||||
"""Read quick lock status (single-shot)."""
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
Provides satellite search, pass predictions, and real-time sky positions
|
||||
from the Craft orbital mechanics API. All methods are blocking — designed
|
||||
to be called from @work(thread=True) workers in the TUI.
|
||||
to be called from worker threads.
|
||||
|
||||
Uses urllib.request + json only (no requests/httpx dependency).
|
||||
"""
|
||||
@ -1,16 +1,23 @@
|
||||
"""Synthetic demo device for the Birdcage TUI.
|
||||
"""Synthetic demo device for Birdcage.
|
||||
|
||||
Drop-in replacement for SerialBridge that simulates a Winegard Carryout G2
|
||||
dish with motor movement, RSSI signal modeling, and canned firmware responses.
|
||||
No serial hardware required.
|
||||
|
||||
Also provides DemoCraftClient — a duck-typed replacement for CraftClient that
|
||||
returns synthetic satellite data with zero HTTP calls. Supports time-varying
|
||||
LEO arcs so the tracking loop drives real pass events to the camera overlay.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import datetime as _dt
|
||||
import math
|
||||
import random
|
||||
import time
|
||||
from enum import Enum, auto
|
||||
|
||||
from birdcage.craft_client import PassPrediction, SearchResult, TargetPosition
|
||||
|
||||
|
||||
class _DemoMenu(Enum):
|
||||
"""Simulated firmware submenu states."""
|
||||
@ -212,6 +219,9 @@ class DemoDevice:
|
||||
self._az_accel = 400.0
|
||||
self._el_accel = 400.0
|
||||
|
||||
# LNB state (boot default is 18V / H-pol / STB mode).
|
||||
self._lnb_mode = "stb"
|
||||
|
||||
# Submenu tracking for console simulation.
|
||||
self._menu = _DemoMenu.ROOT
|
||||
|
||||
@ -375,6 +385,10 @@ class DemoDevice:
|
||||
timeout: float = 120,
|
||||
) -> list[dict[str, float]]:
|
||||
"""Simulate a firmware azscanwxp sweep with Gaussian signal peak."""
|
||||
# Snap EL to target so 2D scans compute correct per-row signal.
|
||||
# Without this, move_motor(1, el) only sets _target_el — _el stays
|
||||
# stale because the position interpolator never runs mid-sweep.
|
||||
self._el = self._target_el
|
||||
step_deg = step_cdeg / 100.0
|
||||
if step_deg <= 0:
|
||||
step_deg = 1.0
|
||||
@ -418,7 +432,18 @@ class DemoDevice:
|
||||
}
|
||||
|
||||
def enable_lna(self) -> None:
|
||||
pass # No-op in demo mode.
|
||||
self._lnb_mode = "odu"
|
||||
|
||||
def set_lnb_voltage(self, mode: str) -> str:
|
||||
mode = mode.strip().lower()
|
||||
if mode not in ("odu", "stb"):
|
||||
raise ValueError(
|
||||
f"Invalid LNB mode {mode!r}: use 'odu' (13V) or 'stb' (18V)"
|
||||
)
|
||||
self._lnb_mode = mode
|
||||
if mode == "odu":
|
||||
return "Enabled LNB ODU"
|
||||
return "Enabled LNB STB"
|
||||
|
||||
def get_lock_status(self) -> str:
|
||||
rssi = int(self._compute_rssi())
|
||||
@ -724,3 +749,216 @@ class DemoDevice:
|
||||
if cmd == "reboot":
|
||||
return "Rebooting...\nApplication Starting Kinetis PCB...\nTRK>"
|
||||
return f"Unknown command: {cmd}\nOS>"
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# DemoCraftClient — offline Craft API replacement
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
# Canned satellite catalog for search results.
|
||||
_DEMO_CATALOG: list[dict] = [
|
||||
{
|
||||
"name": "ISS (ZARYA)",
|
||||
"type": "satellite",
|
||||
"id": "25544",
|
||||
"groups": ["stations"],
|
||||
},
|
||||
{
|
||||
"name": "NOAA 19",
|
||||
"type": "satellite",
|
||||
"id": "33591",
|
||||
"groups": ["weather"],
|
||||
},
|
||||
{
|
||||
"name": "SO-50 (SAUDISAT 1C)",
|
||||
"type": "satellite",
|
||||
"id": "27607",
|
||||
"groups": ["amateur"],
|
||||
},
|
||||
{
|
||||
"name": "TEVEL-2",
|
||||
"type": "satellite",
|
||||
"id": "50988",
|
||||
"groups": ["amateur"],
|
||||
},
|
||||
{
|
||||
"name": "AO-91 (FOX-1B)",
|
||||
"type": "satellite",
|
||||
"id": "43017",
|
||||
"groups": ["amateur"],
|
||||
},
|
||||
{
|
||||
"name": "Moon",
|
||||
"type": "celestial",
|
||||
"id": "moon",
|
||||
"groups": ["solar-system"],
|
||||
},
|
||||
{
|
||||
"name": "Sun",
|
||||
"type": "celestial",
|
||||
"id": "sun",
|
||||
"groups": ["solar-system"],
|
||||
},
|
||||
{
|
||||
"name": "Jupiter",
|
||||
"type": "celestial",
|
||||
"id": "jupiter",
|
||||
"groups": ["solar-system"],
|
||||
},
|
||||
]
|
||||
|
||||
# LEO arc parameters: (phase_offset_minutes, period_minutes, max_el)
|
||||
_LEO_ARCS: dict[str, tuple[float, float, float]] = {
|
||||
"25544": (0.0, 10.0, 55.0), # ISS — primary demo target
|
||||
"33591": (3.0, 9.0, 42.0), # NOAA 19
|
||||
"27607": (5.0, 8.5, 38.0), # SO-50
|
||||
"50988": (7.0, 11.0, 48.0), # TEVEL-2
|
||||
"43017": (2.0, 9.5, 35.0), # AO-91
|
||||
}
|
||||
|
||||
|
||||
def _leo_position(target_id: str, t: float) -> tuple[float, float]:
|
||||
"""Compute time-varying AZ/EL for a simulated LEO satellite.
|
||||
|
||||
The arc traces: rise east (AZ ~90, EL 0) -> TCA (~180, max_el)
|
||||
-> set west (AZ ~270, EL 0) over one period, then resets.
|
||||
Returns (az, el) where el < 0 means below horizon.
|
||||
"""
|
||||
offset, period, max_el = _LEO_ARCS.get(target_id, (0.0, 10.0, 40.0))
|
||||
period_sec = period * 60.0
|
||||
phase = ((t + offset * 60.0) % period_sec) / period_sec # 0.0 -> 1.0
|
||||
|
||||
# Visible window: phase 0.0-0.5 = above horizon, 0.5-1.0 = below
|
||||
if phase > 0.5:
|
||||
return 0.0, -10.0 # Below horizon
|
||||
|
||||
# Map 0->0.5 to AZ 90->270, EL 0->max->0 (sine arc)
|
||||
arc_phase = phase / 0.5 # 0.0 -> 1.0 through the visible pass
|
||||
az = 90.0 + 180.0 * arc_phase
|
||||
el = max_el * math.sin(math.pi * arc_phase)
|
||||
return az, el
|
||||
|
||||
|
||||
def _celestial_position(target_id: str, t: float) -> tuple[float, float]:
|
||||
"""Slow-drift positions for celestial bodies."""
|
||||
if target_id == "moon":
|
||||
az = 145.0 + 0.5 * math.sin(t / 600.0) * (t / 60.0 % 10)
|
||||
el = 32.0 + 3.0 * math.sin(t / 900.0)
|
||||
return az, max(el, 5.0)
|
||||
if target_id == "sun":
|
||||
az = 210.0 + 0.3 * (t / 60.0 % 15)
|
||||
el = 45.0 + 5.0 * math.sin(t / 1200.0)
|
||||
return az, max(el, 10.0)
|
||||
# Jupiter — nearly fixed
|
||||
return 255.0, 28.0
|
||||
|
||||
|
||||
class DemoCraftClient:
|
||||
"""Offline replacement for CraftClient returning synthetic orbital data.
|
||||
|
||||
Duck-typed to match CraftClient's interface. No HTTP calls are made.
|
||||
LEO satellites trace realistic arcs using time.monotonic() so the
|
||||
tracking loop sees genuine AOS/TCA/LOS transitions.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._t0 = time.monotonic()
|
||||
|
||||
def health(self) -> bool:
|
||||
return True
|
||||
|
||||
def search(self, query: str, limit: int = 20) -> list[SearchResult]:
|
||||
q_lower = query.lower()
|
||||
results = []
|
||||
for entry in _DEMO_CATALOG:
|
||||
if q_lower in entry["name"].lower():
|
||||
results.append(
|
||||
SearchResult(
|
||||
name=entry["name"],
|
||||
target_type=entry["type"],
|
||||
target_id=entry["id"],
|
||||
score=1.0,
|
||||
groups=entry["groups"],
|
||||
)
|
||||
)
|
||||
if len(results) >= limit:
|
||||
break
|
||||
return results
|
||||
|
||||
def get_passes(self, norad_id: int, hours: int = 24) -> list[PassPrediction]:
|
||||
now = _dt.datetime.now(tz=_dt.UTC)
|
||||
name = "Unknown"
|
||||
for entry in _DEMO_CATALOG:
|
||||
if entry["id"] == str(norad_id):
|
||||
name = entry["name"]
|
||||
break
|
||||
|
||||
arc_params = _LEO_ARCS.get(str(norad_id), (0.0, 10.0, 40.0))
|
||||
_, period, max_el = arc_params
|
||||
passes = []
|
||||
for i in range(4):
|
||||
aos = now + _dt.timedelta(minutes=30 * (i + 1))
|
||||
tca = aos + _dt.timedelta(minutes=period / 2)
|
||||
los = aos + _dt.timedelta(minutes=period)
|
||||
duration = int(period * 60)
|
||||
passes.append(
|
||||
PassPrediction(
|
||||
satellite_name=name,
|
||||
norad_id=norad_id,
|
||||
aos_time=aos.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
aos_az=90.0 + random.uniform(-10, 10),
|
||||
tca_time=tca.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
tca_alt=max_el,
|
||||
tca_az=180.0 + random.uniform(-15, 15),
|
||||
los_time=los.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
los_az=270.0 + random.uniform(-10, 10),
|
||||
max_elevation=max_el,
|
||||
duration_seconds=duration,
|
||||
is_visible=i < 2,
|
||||
)
|
||||
)
|
||||
return passes
|
||||
|
||||
def get_next_pass(self, norad_id: int) -> PassPrediction | None:
|
||||
passes = self.get_passes(norad_id)
|
||||
return passes[0] if passes else None
|
||||
|
||||
def get_visible_targets(self, min_alt: float = 0.0) -> list[TargetPosition]:
|
||||
t = time.monotonic() - self._t0
|
||||
targets = []
|
||||
|
||||
for entry in _DEMO_CATALOG:
|
||||
tid = entry["id"]
|
||||
ttype = entry["type"]
|
||||
|
||||
if ttype == "satellite" and tid in _LEO_ARCS:
|
||||
az, el = _leo_position(tid, t)
|
||||
elif ttype == "celestial":
|
||||
az, el = _celestial_position(tid, t)
|
||||
else:
|
||||
continue
|
||||
|
||||
if el < min_alt:
|
||||
continue
|
||||
|
||||
# Synthetic distance/range-rate for LEO targets
|
||||
if ttype == "satellite":
|
||||
dist = 400.0 + 200.0 * math.cos(math.pi * el / 90.0)
|
||||
rr = -2.0 + 4.0 * math.sin(t / 120.0)
|
||||
else:
|
||||
dist = 384400.0 if tid == "moon" else 0.0
|
||||
rr = 0.0
|
||||
|
||||
targets.append(
|
||||
TargetPosition(
|
||||
name=entry["name"],
|
||||
target_type=ttype,
|
||||
target_id=tid,
|
||||
azimuth=round(az, 2),
|
||||
altitude=round(el, 2),
|
||||
distance_km=round(dist, 1),
|
||||
range_rate=round(rr, 3),
|
||||
)
|
||||
)
|
||||
|
||||
return targets
|
||||
160
tui/scripts/capture_screenshots.py
Normal file
160
tui/scripts/capture_screenshots.py
Normal file
@ -0,0 +1,160 @@
|
||||
"""Capture all TUI screenshots for documentation.
|
||||
|
||||
Runs the Birdcage TUI in demo mode using Textual's Pilot API,
|
||||
navigates to each screen/sub-mode, and exports SVG + PNG screenshots.
|
||||
|
||||
Usage:
|
||||
cd tui && uv run python scripts/capture_screenshots.py
|
||||
|
||||
Output goes to ../site/public/screenshots/
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
|
||||
|
||||
from birdcage_tui.app import BirdcageApp
|
||||
|
||||
OUTPUT_DIR = (
|
||||
Path(__file__).resolve().parent.parent.parent / "site" / "public" / "screenshots"
|
||||
)
|
||||
TERMINAL_SIZE = (120, 42)
|
||||
|
||||
# Demo search results for craft-search screenshot
|
||||
DEMO_SEARCH_RESULTS = [
|
||||
{
|
||||
"name": "ISS (ZARYA)",
|
||||
"target_type": "satellite",
|
||||
"target_id": "25544",
|
||||
"groups": ["stations"],
|
||||
},
|
||||
{
|
||||
"name": "NOAA 19",
|
||||
"target_type": "satellite",
|
||||
"target_id": "33591",
|
||||
"groups": ["weather"],
|
||||
},
|
||||
{
|
||||
"name": "SO-50 (SAUDISAT 1C)",
|
||||
"target_type": "satellite",
|
||||
"target_id": "27607",
|
||||
"groups": ["amateur"],
|
||||
},
|
||||
{
|
||||
"name": "TEVEL-2",
|
||||
"target_type": "satellite",
|
||||
"target_id": "50988",
|
||||
"groups": ["amateur"],
|
||||
},
|
||||
{
|
||||
"name": "AO-91 (FOX-1B)",
|
||||
"target_type": "satellite",
|
||||
"target_id": "43017",
|
||||
"groups": ["amateur"],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def save(name: str, svg: str) -> None:
|
||||
"""Write SVG and convert to PNG via rsvg-convert."""
|
||||
svg_path = OUTPUT_DIR / f"{name}.svg"
|
||||
png_path = OUTPUT_DIR / f"{name}.png"
|
||||
svg_path.write_text(svg)
|
||||
print(f" {svg_path.name}")
|
||||
try:
|
||||
subprocess.run(
|
||||
["rsvg-convert", "-o", str(png_path), str(svg_path)],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
)
|
||||
print(f" {png_path.name}")
|
||||
except FileNotFoundError:
|
||||
print(" WARNING: rsvg-convert not found, skipping PNG")
|
||||
|
||||
|
||||
async def capture_all() -> None:
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=TERMINAL_SIZE) as pilot:
|
||||
await pilot.pause(1.0)
|
||||
|
||||
# ---- F1 Dashboard ----
|
||||
print("F1 Dashboard")
|
||||
await pilot.press("f1")
|
||||
await pilot.pause(0.5)
|
||||
save("tui-dashboard", app.export_screenshot())
|
||||
|
||||
# ---- F2 Control (Manual mode) ----
|
||||
print("F2 Control")
|
||||
await pilot.press("f2")
|
||||
await pilot.pause(0.5)
|
||||
save("tui-control", app.export_screenshot())
|
||||
|
||||
# ---- F2 Control > Craft (search results) ----
|
||||
print("F2 Control > Craft (search)")
|
||||
craft_mode_btn = app.query_one("#mode-craft")
|
||||
craft_mode_btn.press()
|
||||
await pilot.pause(0.5)
|
||||
|
||||
# Populate search results directly via widget API
|
||||
craft_panel = app.query_one("#ctrl-craft-panel")
|
||||
craft_panel.set_search_results(DEMO_SEARCH_RESULTS)
|
||||
await pilot.pause(0.3)
|
||||
save("tui-craft-search", app.export_screenshot())
|
||||
|
||||
# ---- F2 Control > Craft (tracking Moon) ----
|
||||
print("F2 Control > Craft (tracking)")
|
||||
# Clear results and set tracking state
|
||||
craft_panel.set_search_results([])
|
||||
craft_panel.set_tracking_status(
|
||||
state="TRACKING",
|
||||
target_name="Moon",
|
||||
azimuth=145.00,
|
||||
elevation=32.01,
|
||||
distance_km=384400,
|
||||
range_rate=-0.3,
|
||||
moves=47,
|
||||
rate=1.0,
|
||||
)
|
||||
await pilot.pause(0.3)
|
||||
save("tui-craft-tracking", app.export_screenshot())
|
||||
|
||||
# ---- F3 Signal (Monitor mode) ----
|
||||
print("F3 Signal")
|
||||
await pilot.press("f3")
|
||||
await pilot.pause(0.5)
|
||||
save("tui-signal", app.export_screenshot())
|
||||
|
||||
# ---- F4 System ----
|
||||
print("F4 System")
|
||||
await pilot.press("f4")
|
||||
await pilot.pause(0.5)
|
||||
save("tui-system", app.export_screenshot())
|
||||
|
||||
# ---- F5 Console overlay ----
|
||||
print("F5 Console")
|
||||
await pilot.press("f5")
|
||||
await pilot.pause(0.5)
|
||||
save("tui-console", app.export_screenshot())
|
||||
await pilot.press("f5")
|
||||
await pilot.pause(0.3)
|
||||
|
||||
# ---- F6 Camera overlay ----
|
||||
print("F6 Camera")
|
||||
await pilot.press("f6")
|
||||
await pilot.pause(0.5)
|
||||
save("tui-camera", app.export_screenshot())
|
||||
await pilot.press("f6")
|
||||
await pilot.pause(0.3)
|
||||
|
||||
print("\nDone!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(capture_all())
|
||||
@ -1,21 +1,31 @@
|
||||
"""Take screenshots of all TUI screens in demo mode for documentation."""
|
||||
"""Take screenshots of all TUI screens in demo mode for documentation.
|
||||
|
||||
Generates SVG snapshots of every screen and sub-mode including overlays.
|
||||
All data comes from DemoDevice and DemoCraftClient — no hardware or
|
||||
network required.
|
||||
|
||||
Usage:
|
||||
cd tui && uv run python scripts/take_screenshots.py
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
from birdcage_tui.app import BirdcageApp
|
||||
|
||||
SCREENS = {
|
||||
"f1": "dashboard",
|
||||
"f2": "control",
|
||||
"f3": "signal",
|
||||
"f4": "system",
|
||||
}
|
||||
from birdcage_tui.screens.control import ControlScreen
|
||||
from birdcage_tui.widgets.craft_panel import CraftPanel
|
||||
|
||||
OUT_DIR = "/home/rpm/claude/ham/satellite/winegard-travler/site/public/screenshots"
|
||||
|
||||
|
||||
async def main():
|
||||
for key, name in SCREENS.items():
|
||||
async def _screenshot_basic_tabs():
|
||||
"""F1 Dashboard, F2 Control (manual), F3 Signal, F4 System."""
|
||||
tabs = {
|
||||
"f1": "dashboard",
|
||||
"f2": "control",
|
||||
"f3": "signal",
|
||||
"f4": "system",
|
||||
}
|
||||
for key, name in tabs.items():
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
@ -23,7 +33,6 @@ async def main():
|
||||
await pilot.pause()
|
||||
await pilot.press(key)
|
||||
await pilot.pause()
|
||||
# Let workers populate data.
|
||||
await asyncio.sleep(1.5)
|
||||
|
||||
path = f"{OUT_DIR}/tui-{name}.svg"
|
||||
@ -31,4 +40,107 @@ async def main():
|
||||
print(f"Saved {path}")
|
||||
|
||||
|
||||
async def _screenshot_craft_search():
|
||||
"""F2 Control > Craft sub-mode with search results."""
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=(120, 40)) as pilot:
|
||||
await pilot.pause()
|
||||
await pilot.press("f2")
|
||||
await pilot.pause()
|
||||
|
||||
control = app.query_one("#control", ControlScreen)
|
||||
control.switch_mode("craft")
|
||||
await pilot.pause()
|
||||
|
||||
# Trigger a search — uses DemoCraftClient
|
||||
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
|
||||
panel.post_message(CraftPanel.SearchRequested(""))
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
path = f"{OUT_DIR}/tui-craft-search.svg"
|
||||
app.save_screenshot(path)
|
||||
print(f"Saved {path}")
|
||||
|
||||
|
||||
async def _screenshot_craft_tracking():
|
||||
"""F2 Control > Craft sub-mode actively tracking the Moon."""
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=(120, 40)) as pilot:
|
||||
await pilot.pause()
|
||||
await pilot.press("f2")
|
||||
await pilot.pause()
|
||||
|
||||
control = app.query_one("#control", ControlScreen)
|
||||
control.switch_mode("craft")
|
||||
await pilot.pause()
|
||||
|
||||
# Start tracking the Moon (always above horizon in demo)
|
||||
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
|
||||
panel.post_message(
|
||||
CraftPanel.TrackRequested(
|
||||
target_type="celestial",
|
||||
target_id="moon",
|
||||
name="Moon",
|
||||
min_el=5.0,
|
||||
)
|
||||
)
|
||||
await asyncio.sleep(2.5)
|
||||
|
||||
path = f"{OUT_DIR}/tui-craft-tracking.svg"
|
||||
app.save_screenshot(path)
|
||||
print(f"Saved {path}")
|
||||
|
||||
control._stop_craft_tracking()
|
||||
await pilot.pause()
|
||||
|
||||
|
||||
async def _screenshot_console():
|
||||
"""F5 Console overlay."""
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=(120, 40)) as pilot:
|
||||
await pilot.pause()
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
await pilot.press("f5")
|
||||
await pilot.pause()
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
path = f"{OUT_DIR}/tui-console.svg"
|
||||
app.save_screenshot(path)
|
||||
print(f"Saved {path}")
|
||||
|
||||
|
||||
async def _screenshot_camera():
|
||||
"""F6 Camera overlay."""
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=(120, 40)) as pilot:
|
||||
await pilot.pause()
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
await pilot.press("f6")
|
||||
await pilot.pause()
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
path = f"{OUT_DIR}/tui-camera.svg"
|
||||
app.save_screenshot(path)
|
||||
print(f"Saved {path}")
|
||||
|
||||
|
||||
async def main():
|
||||
await _screenshot_basic_tabs()
|
||||
await _screenshot_craft_search()
|
||||
await _screenshot_craft_tracking()
|
||||
await _screenshot_console()
|
||||
await _screenshot_camera()
|
||||
print(f"\nAll screenshots saved to {OUT_DIR}/")
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
|
||||
@ -102,15 +102,14 @@ class BirdcageApp(App):
|
||||
def _setup_device(self) -> None:
|
||||
"""Create device (demo or real) and hand it to each screen."""
|
||||
if self.demo_mode:
|
||||
from birdcage_tui.demo import DemoDevice
|
||||
from birdcage.demo import DemoDevice
|
||||
|
||||
self.device = DemoDevice()
|
||||
self.device.connect()
|
||||
else:
|
||||
from birdcage.bridge import SerialBridge
|
||||
from birdcage.protocol import get_protocol
|
||||
|
||||
from birdcage_tui.bridge import SerialBridge
|
||||
|
||||
protocol = get_protocol(self.firmware_name)
|
||||
self.device = SerialBridge(protocol)
|
||||
self.device.connect(self.serial_port)
|
||||
@ -141,9 +140,14 @@ class BirdcageApp(App):
|
||||
|
||||
def _setup_craft_client(self) -> None:
|
||||
"""Create a Craft API client and hand it to the control screen."""
|
||||
from birdcage_tui.craft_client import CraftClient
|
||||
if self.demo_mode:
|
||||
from birdcage.demo import DemoCraftClient
|
||||
|
||||
client = CraftClient(base_url=self.craft_url)
|
||||
client = DemoCraftClient()
|
||||
else:
|
||||
from birdcage.craft_client import CraftClient
|
||||
|
||||
client = CraftClient(base_url=self.craft_url)
|
||||
try:
|
||||
control = self.query_one("#control")
|
||||
if hasattr(control, "set_craft_client"):
|
||||
|
||||
@ -9,6 +9,7 @@ import contextlib
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from birdcage.craft_client import CraftClient, CraftTrackingState
|
||||
from textual import work
|
||||
from textual.app import ComposeResult
|
||||
from textual.binding import Binding
|
||||
@ -16,7 +17,6 @@ from textual.containers import Container, Horizontal, Vertical
|
||||
from textual.widgets import Button, ContentSwitcher, Input, Static
|
||||
from textual.worker import Worker
|
||||
|
||||
from birdcage_tui.craft_client import CraftClient, CraftTrackingState
|
||||
from birdcage_tui.rotctld_server import TrackingState, TuiRotctldServer
|
||||
from birdcage_tui.widgets.compass_rose import CompassRose
|
||||
from birdcage_tui.widgets.craft_panel import CraftPanel
|
||||
|
||||
@ -115,7 +115,7 @@ class SignalScreen(Container):
|
||||
yield Static(" Hz", classes="label")
|
||||
yield Button("Start", id="btn-start", variant="primary")
|
||||
yield Button("Stop", id="btn-stop")
|
||||
yield Button("Enable LNA", id="btn-lna")
|
||||
yield Button("V-pol 13V", id="btn-lna")
|
||||
yield Button("Reset Peak", id="btn-reset-peak")
|
||||
|
||||
# -- Sweep mode -------------------------------------------
|
||||
@ -350,9 +350,13 @@ class SignalScreen(Container):
|
||||
label = "YES" if locked else "NO"
|
||||
self.query_one("#lock-status", Static).update(f" Lock: {label}")
|
||||
|
||||
def _update_lna_label(self) -> None:
|
||||
label = "ON" if self._lna_enabled else "OFF"
|
||||
self.query_one("#lna-status", Static).update(f" LNA: {label}")
|
||||
def _update_lna_ui(self) -> None:
|
||||
if self._lna_enabled:
|
||||
self.query_one("#lna-status", Static).update(" LNA: ON (V-pol)")
|
||||
self.query_one("#btn-lna", Button).label = "H-pol 18V"
|
||||
else:
|
||||
self.query_one("#lna-status", Static).update(" LNA: OFF (H-pol)")
|
||||
self.query_one("#btn-lna", Button).label = "V-pol 13V"
|
||||
|
||||
def _update_status_strip_rssi(self, rssi_avg: int) -> None:
|
||||
"""Push RSSI to the app-level StatusStrip."""
|
||||
@ -386,19 +390,29 @@ class SignalScreen(Container):
|
||||
if self._device is None:
|
||||
self.app.notify("No device connected", severity="warning")
|
||||
return
|
||||
self._do_enable_lna()
|
||||
self._do_toggle_lnb()
|
||||
|
||||
@work(thread=True, exclusive=False, group="signal-cmd")
|
||||
def _do_enable_lna(self) -> None:
|
||||
def _do_toggle_lnb(self) -> None:
|
||||
try:
|
||||
self._device.enable_lna()
|
||||
self._lna_enabled = True
|
||||
self.app.call_from_thread(self._update_lna_label)
|
||||
self.app.call_from_thread(self.app.notify, "LNA enabled (13V ODU)")
|
||||
if self._lna_enabled:
|
||||
self._device.set_lnb_voltage("stb")
|
||||
self._lna_enabled = False
|
||||
self.app.call_from_thread(self._update_lna_ui)
|
||||
self.app.call_from_thread(
|
||||
self.app.notify, "LNB set to 18V (H-pol)"
|
||||
)
|
||||
else:
|
||||
self._device.set_lnb_voltage("odu")
|
||||
self._lna_enabled = True
|
||||
self.app.call_from_thread(self._update_lna_ui)
|
||||
self.app.call_from_thread(
|
||||
self.app.notify, "LNB set to 13V (V-pol, LNA on)"
|
||||
)
|
||||
except Exception:
|
||||
log.exception("LNA enable failed")
|
||||
log.exception("LNB voltage switch failed")
|
||||
self.app.call_from_thread(
|
||||
self.app.notify, "LNA enable failed", severity="error"
|
||||
self.app.notify, "LNB voltage switch failed", severity="error"
|
||||
)
|
||||
|
||||
def _handle_reset_peak(self) -> None:
|
||||
|
||||
@ -13,7 +13,7 @@ from unittest.mock import MagicMock
|
||||
import pytest
|
||||
|
||||
from birdcage_tui.app import BirdcageApp
|
||||
from birdcage_tui.craft_client import PassPrediction, SearchResult, TargetPosition
|
||||
from birdcage.craft_client import PassPrediction, SearchResult, TargetPosition
|
||||
from birdcage_tui.screens.control import ControlScreen
|
||||
from birdcage_tui.widgets.craft_panel import CraftPanel, CraftTrackingStatus
|
||||
|
||||
|
||||
164
tui/tests/test_demo_craft.py
Normal file
164
tui/tests/test_demo_craft.py
Normal file
@ -0,0 +1,164 @@
|
||||
"""Test DemoCraftClient — full Craft lifecycle in demo mode.
|
||||
|
||||
Unlike test_craft_mode.py which uses MagicMock, these tests run with the
|
||||
real DemoCraftClient to verify the end-to-end demo experience: search,
|
||||
pass predictions, and tracking with time-varying satellite positions.
|
||||
|
||||
No network calls are made.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from birdcage_tui.app import BirdcageApp
|
||||
from birdcage_tui.screens.control import ControlScreen
|
||||
from birdcage_tui.widgets.craft_panel import CraftPanel, CraftTrackingStatus
|
||||
|
||||
|
||||
async def _switch_to_craft(pilot, app) -> None:
|
||||
"""Navigate to F2 Control > Craft sub-mode."""
|
||||
await pilot.press("f2")
|
||||
await pilot.pause()
|
||||
control = app.query_one("#control", ControlScreen)
|
||||
control.switch_mode("craft")
|
||||
await pilot.pause()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_demo_search_returns_results():
|
||||
"""DemoCraftClient search should populate the DataTable."""
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=(120, 40)) as pilot:
|
||||
await pilot.pause()
|
||||
await _switch_to_craft(pilot, app)
|
||||
|
||||
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
|
||||
panel.post_message(CraftPanel.SearchRequested("ISS"))
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
from textual.widgets import DataTable
|
||||
|
||||
table = app.query_one("#craft-results-table", DataTable)
|
||||
assert table.row_count >= 1
|
||||
first_key = list(table.rows.keys())[0]
|
||||
row = table.get_row(first_key)
|
||||
assert "ISS" in row[0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_demo_broad_search():
|
||||
"""Empty query should return the full demo catalog."""
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=(120, 40)) as pilot:
|
||||
await pilot.pause()
|
||||
await _switch_to_craft(pilot, app)
|
||||
|
||||
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
|
||||
panel.post_message(CraftPanel.SearchRequested(""))
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
from textual.widgets import DataTable
|
||||
|
||||
table = app.query_one("#craft-results-table", DataTable)
|
||||
assert table.row_count == 8 # Full demo catalog
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_demo_passes():
|
||||
"""DemoCraftClient should return synthetic pass predictions."""
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=(120, 40)) as pilot:
|
||||
await pilot.pause()
|
||||
await _switch_to_craft(pilot, app)
|
||||
|
||||
control = app.query_one("#control", ControlScreen)
|
||||
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
|
||||
|
||||
panel.post_message(CraftPanel.PassesRequested(norad_id=25544))
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
from birdcage_tui.widgets.craft_panel import CraftPassInfo
|
||||
|
||||
info = app.query_one("#craft-pass-info", CraftPassInfo)
|
||||
# Should contain max elevation of 55.0 degrees (ISS arc)
|
||||
assert "55.0" in info.passes_text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_demo_tracking_drives_device():
|
||||
"""DemoCraftClient tracking should move the demo device."""
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=(120, 40)) as pilot:
|
||||
await pilot.pause()
|
||||
await _switch_to_craft(pilot, app)
|
||||
|
||||
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
|
||||
|
||||
# Track the Moon — always above horizon in demo
|
||||
panel.post_message(
|
||||
CraftPanel.TrackRequested(
|
||||
target_type="celestial",
|
||||
target_id="moon",
|
||||
name="Moon",
|
||||
min_el=5.0,
|
||||
)
|
||||
)
|
||||
await asyncio.sleep(2.5)
|
||||
|
||||
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
|
||||
assert status.state == "TRACKING"
|
||||
assert status.moves >= 1
|
||||
|
||||
# Device should have been commanded toward the Moon's position
|
||||
assert app.device._target_az > 100.0 # Moon is around AZ 145
|
||||
assert app.device._target_el > 20.0 # Moon is around EL 32
|
||||
|
||||
# Screenshot for visual verification
|
||||
app.save_screenshot("/tmp/birdcage_demo_craft_tracking.svg")
|
||||
|
||||
# Stop tracking
|
||||
control = app.query_one("#control", ControlScreen)
|
||||
control._stop_craft_tracking()
|
||||
await pilot.pause()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_demo_tracking_waiting_below_horizon():
|
||||
"""Tracking a below-horizon LEO target shows WAITING."""
|
||||
app = BirdcageApp()
|
||||
app.demo_mode = True
|
||||
|
||||
async with app.run_test(size=(120, 40)) as pilot:
|
||||
await pilot.pause()
|
||||
await _switch_to_craft(pilot, app)
|
||||
|
||||
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
|
||||
|
||||
# SO-50 may be below horizon depending on timing.
|
||||
# Use Jupiter at min_el=90 to guarantee WAITING.
|
||||
panel.post_message(
|
||||
CraftPanel.TrackRequested(
|
||||
target_type="celestial",
|
||||
target_id="jupiter",
|
||||
name="Jupiter",
|
||||
min_el=90.0, # Impossible — forces WAITING
|
||||
)
|
||||
)
|
||||
await asyncio.sleep(2.0)
|
||||
|
||||
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
|
||||
assert status.state == "WAITING"
|
||||
|
||||
# Cleanup
|
||||
control = app.query_one("#control", ControlScreen)
|
||||
control._stop_craft_tracking()
|
||||
await pilot.pause()
|
||||
Loading…
x
Reference in New Issue
Block a user