Compare commits

...

6 Commits

Author SHA1 Message Date
b98a5482fa Document screenshot capture workflow and MCP server 2026-02-17 17:54:35 -07:00
b0aee4e5a6 Add automated TUI screenshot capture script
Textual Pilot-based script that launches demo mode, navigates
all screens, and exports SVG + PNG via rsvg-convert.
2026-02-17 17:49:01 -07:00
f8bfd69ceb Add LNB polarity toggle (V-pol 13V / H-pol 18V)
Bridge: set_lnb_voltage(mode) wraps firmware lnbdc command, enable_lna()
now delegates to it. MCP: new set_lnb_voltage tool + 3 tests. TUI: Signal
screen button toggles between V-pol and H-pol instead of one-way LNA enable.
2026-02-17 17:24:05 -07:00
8a6b99bd8c Add birdcage-mcp FastMCP server for satellite dish control
34 tools (connection, movement, signal, system, satellite, console),
5 resources, 3 prompts. Backed by DemoDevice for offline testing.
46 tests passing against the demo backend via run_server_async.
2026-02-17 16:01:51 -07:00
16ca4892b3 Promote bridge, demo, craft_client to core birdcage package
Move bridge.py, demo.py, craft_client.py from tui/src/birdcage_tui/ to
src/birdcage/ so both TUI and MCP server can share the device layer
without a circular dependency on textual.
2026-02-17 16:01:38 -07:00
3013eeee4c Add DemoCraftClient for complete offline demo mode
- DemoCraftClient in demo.py: duck-typed CraftClient replacement with
  8 canned satellites, synthetic pass predictions, and time-varying LEO
  arcs that drive real AOS/TCA/LOS pass events to the camera overlay
- Fix sky map EL signal fidelity: snap _el to _target_el at sweep start
  so 2D scans compute correct per-row RSSI (was using stale elevation)
- Branch on demo_mode in app.py _setup_craft_client() to inject
  DemoCraftClient instead of the HTTP CraftClient
- Add test_demo_craft.py: 5 tests exercising search, passes, tracking,
  and WAITING state through the full TUI without mocks
- Update take_screenshots.py to cover all 8 screens (dashboard, control,
  craft search, craft tracking, signal, system, console, camera)
2026-02-16 11:49:39 -07:00
33 changed files with 4134 additions and 44 deletions

View File

@ -739,3 +739,75 @@ ssh -A warehack-ing@warehack.ing "cd birdcage-docs && git pull && make prod"
```
TLS is automatic via caddy-docker-proxy (ACME + Vultr DNS challenge). New subdomains take ~2 minutes for certificate issuance.
### Screenshots
TUI screenshots live in `site/public/screenshots/` (the site repo, not the main repo).
An automated capture script uses Textual's Pilot API to render each screen in demo mode.
**Regenerating all screenshots:**
```bash
cd tui && uv run python scripts/capture_screenshots.py
```
This captures 8 SVG + 8 PNG screenshots plus a 2×3 collage:
- `tui-dashboard` — F1 action cards
- `tui-control` — F2 Manual mode with compass rose
- `tui-craft-search` — F2 Craft mode with satellite search results
- `tui-craft-tracking` — F2 Craft mode tracking the Moon
- `tui-signal` — F3 Monitor mode with RSSI gauge and receiver info
- `tui-system` — F4 Hardware mode with firmware ID and A3981 diagnostics
- `tui-console` — F5 overlay with serial console
- `tui-camera` — F6 overlay with capture triggers
- `tui-collage` — 2×3 montage of 6 main screens (via ImageMagick `montage`)
**Dependencies:** `rsvg-convert` (SVG→PNG), `montage` (collage).
**When to regenerate:** After any UI change that affects widget layout, button labels,
or screen content. The script populates Craft search results and tracking state
directly via widget API for deterministic screenshots.
## MCP Server
| Property | Value |
|----------|-------|
| Package | `birdcage-mcp` |
| Source | `mcp/src/birdcage_mcp/` |
| Entry point | `birdcage_mcp.server:main` |
| Tools | 35 (connection, movement, signal, system, satellite, console) |
| Resources | 5 (`birdcage://config`, `position`, `firmware`, `motor-dynamics`, `el-limits`) |
| Prompts | 3 (`setup_wizard`, `satellite_tracking_guide`, `rf_sweep_guide`) |
| Tests | `mcp/tests/` — 49 tests against DemoDevice via `run_server_async` |
### Running
```bash
# Demo mode (no hardware)
BIRDCAGE_DEMO=1 uv run --directory mcp birdcage-mcp
# Hardware mode
BIRDCAGE_PORT=/dev/ttyUSB2 uv run --directory mcp birdcage-mcp
```
### Adding to Claude Code
```bash
claude mcp add birdcage-mcp -- env BIRDCAGE_DEMO=1 uv run --directory mcp birdcage-mcp
```
### Environment Variables
| Variable | Default | Purpose |
|----------|---------|---------|
| `BIRDCAGE_DEMO` | `false` | Enable demo mode (DemoDevice + DemoCraftClient) |
| `BIRDCAGE_PORT` | `/dev/ttyUSB0` | Serial port for hardware mode |
| `BIRDCAGE_FIRMWARE` | `g2` | Firmware variant (`g2`, `hal205`, etc.) |
| `BIRDCAGE_CRAFT_URL` | `https://space.warehack.ing` | Orbital prediction API |
### Testing
```bash
cd mcp && uv run pytest tests/ # 49 tests via FastMCP run_server_async
uv run ruff check src/ # Lint
```

34
mcp/pyproject.toml Normal file
View File

@ -0,0 +1,34 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "birdcage-mcp"
version = "2026.02.17"
description = "FastMCP server for Winegard satellite dish control"
license = "MIT"
requires-python = ">=3.11"
authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}]
dependencies = [
"birdcage",
"fastmcp>=2.0",
]
[project.scripts]
birdcage-mcp = "birdcage_mcp.server:main"
[tool.uv.sources]
birdcage = { path = "..", editable = true }
[tool.ruff]
target-version = "py311"
src = ["src"]
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]
[tool.hatch.build.targets.wheel]
packages = ["src/birdcage_mcp"]
[dependency-groups]
dev = ["pytest>=9.0.2", "pytest-asyncio>=1.3.0"]

View File

@ -0,0 +1 @@
"""birdcage-mcp: FastMCP server for Winegard satellite dish control."""

View File

@ -0,0 +1,78 @@
"""MCP prompts — guided workflows for common dish operations."""
def register(mcp):
"""Register MCP prompts on the FastMCP instance."""
@mcp.prompt()
async def setup_wizard() -> str:
"""Step-by-step guide to connect and initialize the satellite dish.
Walks through: check status -> connect to serial port -> verify
firmware -> home motors -> confirm position.
"""
return (
"Follow these steps to set up the satellite dish:\n\n"
"1. Call `status` to check the current connection state.\n"
"2. If not connected, call `connect` with the correct port "
"and firmware variant.\n"
"3. Call `get_firmware_id` to verify the firmware version.\n"
"4. Call `home_motor` with motor_id=0 (AZ) then motor_id=1 (EL) "
"to establish reference positions.\n"
"5. Call `get_position` to verify the dish is at the expected "
"home coordinates.\n"
"6. Call `get_el_limits` to confirm the elevation operating range.\n\n"
"The dish is now ready for tracking or manual positioning."
)
@mcp.prompt()
async def satellite_tracking_guide() -> str:
"""Guide for tracking a satellite pass with the dish.
Walks through: search -> get passes -> wait for AOS -> poll
position at 1 Hz -> move dish -> detect LOS.
"""
return (
"Follow these steps to track a satellite:\n\n"
"1. Call `search_satellites` with the satellite name "
"(e.g. 'ISS', 'NOAA 19').\n"
"2. Note the NORAD ID from the search results.\n"
"3. Call `get_passes` with the NORAD ID to see upcoming passes.\n"
"4. Choose a pass with good max elevation (>30 deg is ideal).\n"
"5. When the pass is about to start (AOS time), begin a tracking "
"loop:\n"
" a. Call `get_visible_targets` to get the satellite's current "
"AZ/EL.\n"
" b. Call `move_to` with those coordinates.\n"
" c. Wait ~1 second, then repeat from (a).\n"
"6. Stop tracking when the satellite drops below your minimum "
"elevation.\n\n"
"TIP: Enable the LNA first with `enable_lna` if you want to "
"measure signal strength during the pass."
)
@mcp.prompt()
async def rf_sweep_guide() -> str:
"""Guide for performing an RF sky sweep with the dish.
Walks through: enable LNA -> baseline RSSI -> configure sweep
-> run az_sweep -> analyze results.
"""
return (
"Follow these steps for an RF sky sweep:\n\n"
"1. Call `enable_lna` to power the low-noise amplifier.\n"
"2. Call `get_rssi` to establish a baseline noise floor.\n"
"3. Decide your sweep parameters:\n"
" - start_az: Starting azimuth\n"
" - span: How many degrees to sweep\n"
" - step_cdeg: Resolution in centidegrees (100 = 1 deg)\n"
" - num_xponders: Transponders per position (1 for basic)\n"
"4. Call `az_sweep` with those parameters.\n"
"5. Analyze the returned points:\n"
" - Look for RSSI peaks above the noise floor (~500)\n"
" - Check lock=1 points for strong signals\n"
" - SNR values indicate signal quality\n\n"
"For a 2D sky map, repeat the sweep at different elevations "
"(e.g. EL 20 to 60 in 5 deg steps) using `move_motor` to "
"set each elevation before sweeping."
)

View File

@ -0,0 +1,57 @@
"""MCP resources — live data endpoints for dish state."""
import json
from fastmcp import Context
def register(mcp):
"""Register MCP resources on the FastMCP instance."""
@mcp.resource("birdcage://config")
async def config_resource(ctx: Context) -> str:
"""Current connection configuration."""
state = ctx.request_context.lifespan_context
data = {
"demo_mode": state.demo_mode,
"serial_port": state.serial_port,
"firmware": state.firmware_name,
"connected": state.connected,
}
return json.dumps(data, indent=2)
@mcp.resource("birdcage://position")
async def position_resource(ctx: Context) -> str:
"""Live dish position (queries hardware)."""
state = ctx.request_context.lifespan_context
if state.device is None:
return json.dumps({"error": "not connected"})
pos = state.device.get_position()
return json.dumps(pos, indent=2)
@mcp.resource("birdcage://firmware")
async def firmware_resource(ctx: Context) -> str:
"""Firmware identification string."""
state = ctx.request_context.lifespan_context
if state.device is None:
return json.dumps({"error": "not connected"})
fw_id = state.device.get_firmware_id()
return fw_id
@mcp.resource("birdcage://motor-dynamics")
async def motor_dynamics_resource(ctx: Context) -> str:
"""Motor velocity and acceleration limits."""
state = ctx.request_context.lifespan_context
if state.device is None:
return json.dumps({"error": "not connected"})
dynamics = state.device.get_motor_dynamics()
return json.dumps(dynamics, indent=2)
@mcp.resource("birdcage://el-limits")
async def el_limits_resource(ctx: Context) -> str:
"""Elevation angle limits (min, max, home)."""
state = ctx.request_context.lifespan_context
if state.device is None:
return json.dumps({"error": "not connected"})
limits = state.device.get_el_limits()
return json.dumps(limits, indent=2)

View File

@ -0,0 +1,92 @@
"""Birdcage MCP server — FastMCP entry point with lifespan management."""
import logging
import os
import sys
from contextlib import asynccontextmanager
from fastmcp import FastMCP
from birdcage_mcp.state import BirdcageState
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(server: FastMCP):
"""Server lifespan: configure state from environment, optionally pre-connect."""
state = BirdcageState()
demo_env = os.environ.get("BIRDCAGE_DEMO", "false").lower()
state.demo_mode = demo_env in ("1", "true", "yes")
state.serial_port = os.environ.get("BIRDCAGE_PORT", "/dev/ttyUSB0")
state.firmware_name = os.environ.get("BIRDCAGE_FIRMWARE", "g2")
craft_url = os.environ.get(
"BIRDCAGE_CRAFT_URL", "https://space.warehack.ing"
)
if state.demo_mode:
from birdcage.demo import DemoCraftClient, DemoDevice
device = DemoDevice()
device.connect()
device.initialize()
state.device = device
state.craft_client = DemoCraftClient()
state.connected = True
logger.info("Demo mode: DemoDevice + DemoCraftClient ready")
else:
from birdcage.craft_client import CraftClient
state.craft_client = CraftClient(base_url=craft_url)
logger.info(
"Hardware mode: use 'connect' tool to open serial port %s",
state.serial_port,
)
try:
yield state
finally:
if state.device is not None and hasattr(state.device, "disconnect"):
try:
state.device.disconnect()
except Exception:
logger.debug("Disconnect error during shutdown", exc_info=True)
logger.info("Birdcage MCP server shut down")
mcp = FastMCP(
"birdcage",
instructions=(
"Control a Winegard satellite dish (Carryout G2 / Trav'ler) for "
"amateur radio satellite tracking. Provides motor control, signal "
"measurement, firmware management, and orbital tracking via the "
"Craft API."
),
lifespan=lifespan,
)
# Register tool modules
from birdcage_mcp import prompts, resources # noqa: E402, F401
from birdcage_mcp.tools import ( # noqa: E402
connection,
console,
movement,
satellite,
signal,
system,
)
connection.register(mcp)
movement.register(mcp)
signal.register(mcp)
system.register(mcp)
satellite.register(mcp)
console.register(mcp)
resources.register(mcp)
prompts.register(mcp)
def main():
print("birdcage-mcp v2026.02.17", file=sys.stderr)
mcp.run()

View File

@ -0,0 +1,29 @@
"""Session state for the Birdcage MCP server."""
from dataclasses import dataclass, field
@dataclass
class BirdcageState:
"""Mutable session state shared across all MCP tools.
Populated by the server lifespan and mutated by the ``connect``
and ``disconnect`` tools. Device and craft_client are typed as
``object`` to accept both real (SerialBridge/CraftClient) and
demo (DemoDevice/DemoCraftClient) implementations without
importing them at definition time.
"""
device: object = None
craft_client: object = None
demo_mode: bool = False
serial_port: str = ""
firmware_name: str = "g2"
connected: bool = False
_errors: list[str] = field(default_factory=list)
def record_error(self, msg: str) -> None:
"""Append an error message (capped at 50 entries)."""
self._errors.append(msg)
if len(self._errors) > 50:
self._errors = self._errors[-50:]

View File

@ -0,0 +1 @@
"""Birdcage MCP tool modules."""

View File

@ -0,0 +1,112 @@
"""Connection management tools — connect, disconnect, status."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def register(mcp):
"""Register connection tools on the FastMCP instance."""
@mcp.tool()
async def connect(
ctx: Context,
port: str = "",
firmware: str = "",
skip_init: bool = False,
) -> dict:
"""Connect to a Winegard satellite dish via RS-422/RS-485 serial.
In demo mode this is a no-op (already connected). In hardware mode,
opens the serial port and optionally runs the firmware initialization
sequence (kills TV satellite search, enters motor submenu).
Args:
port: Serial port path (e.g. /dev/ttyUSB0). Defaults to
BIRDCAGE_PORT env var.
firmware: Firmware variant: "g2", "hal205", or "hal000".
Defaults to BIRDCAGE_FIRMWARE env var.
skip_init: If True, skip firmware init (useful for
reconnecting to a running dish).
"""
state: BirdcageState = ctx.request_context.lifespan_context
if state.demo_mode:
return {
"status": "connected",
"mode": "demo",
"message": "Demo device already connected",
}
if state.connected and state.device is not None:
return {
"status": "already_connected",
"port": state.serial_port,
}
serial_port = port or state.serial_port
fw = firmware or state.firmware_name
from birdcage.bridge import SerialBridge
from birdcage.protocol import get_protocol
protocol = get_protocol(fw)
device = SerialBridge(protocol)
device.connect(serial_port)
if not skip_init:
device.initialize()
state.device = device
state.serial_port = serial_port
state.firmware_name = fw
state.connected = True
return {
"status": "connected",
"port": serial_port,
"firmware": fw,
"initialized": not skip_init,
}
@mcp.tool()
async def disconnect(ctx: Context) -> dict:
"""Disconnect from the satellite dish and release the serial port.
In demo mode this is a no-op. In hardware mode, returns to the
root menu and closes the serial connection cleanly.
"""
state: BirdcageState = ctx.request_context.lifespan_context
if state.demo_mode:
return {"status": "demo", "message": "Demo mode — nothing to disconnect"}
if state.device is None or not state.connected:
return {"status": "not_connected"}
state.device.disconnect()
state.device = None
state.connected = False
return {"status": "disconnected"}
@mcp.tool()
async def status(ctx: Context) -> dict:
"""Get current connection status and configuration.
Returns whether the dish is connected, the serial port, firmware
variant, demo mode flag, and current menu state (if connected).
"""
state: BirdcageState = ctx.request_context.lifespan_context
result = {
"connected": state.connected,
"demo_mode": state.demo_mode,
"serial_port": state.serial_port,
"firmware": state.firmware_name,
}
if state.device is not None and hasattr(state.device, "current_menu"):
result["current_menu"] = state.device.current_menu
return result

View File

@ -0,0 +1,49 @@
"""Raw console tool — safety-gated direct firmware access."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
# Commands that are dangerous at the root menu level.
_BLOCKED_ROOT_COMMANDS = {"q"}
def register(mcp):
"""Register console tools on the FastMCP instance."""
@mcp.tool()
async def send_raw_command(ctx: Context, command: str) -> dict:
"""Send a raw command to the firmware console and return the response.
This provides direct access to the Winegard firmware shell. The
command is sent as-is and the raw response is returned.
SAFETY: The 'q' command at root menu is BLOCKED it kills the
UART shell and requires a hardware power cycle to recover.
HAZARDOUS COMMANDS (use with caution):
- ADC 'scan' without arguments on uncalibrated AZ: deadlocks shell
- NVS 'e <idx> <value>': writes to non-volatile storage
- 'reboot': reboots the microcontroller
- 'stow': folds dish flat (modified feeds may not survive)
After a raw command, the menu state is marked as unknown.
Args:
command: The firmware command string to send.
"""
state: BirdcageState = ctx.request_context.lifespan_context
if state.device is None:
raise RuntimeError("No device connected. Use the 'connect' tool first.")
cmd_stripped = command.strip().lower()
if cmd_stripped in _BLOCKED_ROOT_COMMANDS:
return {
"error": f"Command '{command}' is blocked for safety. "
"The 'q' command kills the UART shell and requires "
"a hardware power cycle to recover.",
"blocked": True,
}
raw = state.device.send_raw(command)
return {"response": raw}

View File

@ -0,0 +1,154 @@
"""Movement tools — motor positioning, homing, and engagement."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def _require_device(state: BirdcageState):
"""Raise if no device is connected."""
if state.device is None:
raise RuntimeError(
"No device connected. Use the 'connect' tool first."
)
def register(mcp):
"""Register movement tools on the FastMCP instance."""
@mcp.tool()
async def get_position(ctx: Context) -> dict:
"""Query the current azimuth and elevation of the dish.
Returns {"azimuth": float, "elevation": float} in degrees.
Azimuth range is 0-455 (before cable wrap). Elevation range
depends on firmware variant (typically 18-65 deg for Carryout G2).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_position()
@mcp.tool()
async def move_to(
ctx: Context,
azimuth: float,
elevation: float,
) -> dict:
"""Move the dish to an absolute azimuth and elevation.
Both motors are commanded sequentially (AZ then EL). The firmware
queues commands, so this returns immediately the dish moves in
the background. Poll get_position to track progress.
Args:
azimuth: Target azimuth in degrees (0-455).
elevation: Target elevation in degrees (18-65 for Carryout G2).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.move_to(azimuth, elevation)
return {
"status": "moving",
"target_azimuth": azimuth,
"target_elevation": elevation,
}
@mcp.tool()
async def move_motor(
ctx: Context,
motor_id: int,
degrees: float,
) -> dict:
"""Move a single motor to an absolute position.
Args:
motor_id: 0 for azimuth, 1 for elevation.
degrees: Target angle in degrees.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.move_motor(motor_id, degrees)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "moving", "axis": axis, "target": degrees}
@mcp.tool()
async def home_motor(ctx: Context, motor_id: int) -> dict:
"""Home a motor to its reference position using stall detection.
The dish uses motor stalling (not limit switches) to find the
mechanical home position. Expect audible grinding during homing.
Args:
motor_id: 0 for azimuth, 1 for elevation.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.home_motor(motor_id)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "homing", "axis": axis}
@mcp.tool()
async def engage_motors(ctx: Context) -> dict:
"""Energize the stepper motors (apply holding torque).
Motors must be engaged before movement commands will work.
They draw power while engaged even when stationary.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.engage()
return {"status": "engaged"}
@mcp.tool()
async def release_motors(ctx: Context) -> dict:
"""De-energize the stepper motors (remove holding torque).
The dish will be free to move under wind/gravity load.
Use before manual repositioning or to save power.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.release()
return {"status": "released"}
@mcp.tool()
async def stow(ctx: Context) -> dict:
"""Move the dish to stow position (AZ=0, EL=65).
Stow position is safe for transport with the standard feed horn.
WARNING: Modified feeds may not survive the stow movement.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.move_to(0.0, 65.0)
return {
"status": "stowing",
"target_azimuth": 0.0,
"target_elevation": 65.0,
}
@mcp.tool()
async def get_step_positions(ctx: Context) -> dict:
"""Read raw stepper motor positions in step counts.
Returns {"az_steps": int, "el_steps": int}. The Carryout G2 has
40000 steps/rev for AZ and 24960 steps/rev for EL.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_step_positions()
@mcp.tool()
async def get_el_limits(ctx: Context) -> dict:
"""Read the firmware elevation angle limits.
Returns {"min": float, "max": float, "home": float} in degrees.
The Carryout G2 defaults are min=18, max=65, home=65.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_el_limits()

View File

@ -0,0 +1,102 @@
"""Satellite tracking tools — search, passes, visible targets."""
from dataclasses import asdict
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def _require_craft(state: BirdcageState):
if state.craft_client is None:
raise RuntimeError("No Craft API client configured.")
def register(mcp):
"""Register satellite tools on the FastMCP instance."""
@mcp.tool()
async def search_satellites(
ctx: Context,
query: str,
limit: int = 20,
) -> dict:
"""Search the orbital object catalog for satellites, planets, and stars.
Uses the Craft API (space.warehack.ing) to find objects by name.
Returns matches with name, type, NORAD ID (for satellites), and
current altitude if above horizon.
Args:
query: Search term (e.g. "ISS", "NOAA", "Moon").
limit: Maximum results to return.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_craft(state)
results = state.craft_client.search(query, limit)
return {
"results": [asdict(r) for r in results],
"count": len(results),
}
@mcp.tool()
async def get_passes(
ctx: Context,
norad_id: int,
hours: int = 24,
) -> dict:
"""Get upcoming pass predictions for a satellite.
Returns AOS/TCA/LOS times, azimuths, max elevation, duration,
and visibility for each predicted pass.
Args:
norad_id: NORAD catalog number (e.g. 25544 for ISS).
hours: How many hours ahead to predict (default 24).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_craft(state)
passes = state.craft_client.get_passes(norad_id, hours)
return {
"passes": [asdict(p) for p in passes],
"count": len(passes),
}
@mcp.tool()
async def get_next_pass(ctx: Context, norad_id: int) -> dict:
"""Get the next upcoming pass for a satellite.
Returns the single soonest pass prediction, or null if none
are predicted in the near future.
Args:
norad_id: NORAD catalog number (e.g. 25544 for ISS).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_craft(state)
p = state.craft_client.get_next_pass(norad_id)
if p is None:
return {"pass": None}
return {"pass": asdict(p)}
@mcp.tool()
async def get_visible_targets(
ctx: Context,
min_alt: float = 0.0,
) -> dict:
"""Get all objects currently above the horizon with real-time positions.
Returns satellites, planets, and stars with AZ/EL computed
server-side. This is the primary tracking endpoint use it
to find what's overhead right now.
Args:
min_alt: Minimum altitude in degrees (default 0 = horizon).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_craft(state)
targets = state.craft_client.get_visible_targets(min_alt)
return {
"targets": [asdict(t) for t in targets],
"count": len(targets),
}

View File

@ -0,0 +1,150 @@
"""Signal measurement tools — RSSI, DVB config, LNA, and AZ sweep."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def _require_device(state: BirdcageState):
if state.device is None:
raise RuntimeError("No device connected. Use the 'connect' tool first.")
def register(mcp):
"""Register signal tools on the FastMCP instance."""
@mcp.tool()
async def get_rssi(ctx: Context, iterations: int = 10) -> dict:
"""Read averaged RSSI signal strength from the BCM4515 DVB tuner.
Returns {"reads": int, "average": int, "current": int}.
Noise floor is approximately 500 ADC counts.
Args:
iterations: Number of samples to average (default 10).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_rssi(iterations)
@mcp.tool()
async def get_adc_rssi(ctx: Context) -> dict:
"""Read single-shot ADC RSSI value (raw, unbounded).
Returns the raw ADC count from the hardware ADC subsystem,
bypassing the DVB tuner's averaging. Useful for quick signal checks.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_adc_rssi()
return {"raw": raw}
@mcp.tool()
async def get_lock_status(ctx: Context) -> dict:
"""Check DVB signal lock status (single-shot).
Returns the raw firmware response string containing lock state,
RSSI, and glitch count.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_lock_status()
return {"raw": raw}
@mcp.tool()
async def enable_lna(ctx: Context) -> dict:
"""Enable the LNA by setting LNB voltage to 13V (V-pol).
Shortcut for set_lnb_voltage(mode='odu'). Use set_lnb_voltage
to switch back to 18V (H-pol) when done.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.enable_lna()
return {"status": "lna_enabled", "voltage": "13V", "polarity": "V"}
@mcp.tool()
async def set_lnb_voltage(ctx: Context, mode: str) -> dict:
"""Set LNB DC voltage and polarization.
Controls the LNB bias voltage on the coax feed line. This
determines both LNA power state and receive polarization:
- 'odu' = 13V (V-pol, LNA enabled) for radio astronomy / ham
- 'stb' = 18V (H-pol, boot default) standard consumer mode
V-pol and H-pol see different transponders. The PEAK submenu's
'rssits' command alternates between them for comparison.
Args:
mode: 'odu' for 13V/V-pol or 'stb' for 18V/H-pol.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.set_lnb_voltage(mode)
voltage = "13V" if mode.lower() == "odu" else "18V"
polarity = "V" if mode.lower() == "odu" else "H"
return {
"status": "set",
"mode": mode.lower(),
"voltage": voltage,
"polarity": polarity,
"raw": raw,
}
@mcp.tool()
async def get_dvb_config(ctx: Context) -> dict:
"""Read BCM4515 DVB tuner hardware and firmware version.
Returns chip ID, revision, firmware version, and strap config.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_dvb_config()
return {"raw": raw}
@mcp.tool()
async def get_channel_params(ctx: Context) -> dict:
"""Read current DVB channel parameters.
Returns frequency, symbol rate, modulation, LNB polarity,
tone settings, and scan configuration.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_channel_params()
return {"raw": raw}
@mcp.tool()
async def az_sweep(
ctx: Context,
start_az: float,
span: float,
step_cdeg: int = 100,
num_xponders: int = 1,
timeout: float = 120.0,
) -> dict:
"""Execute a firmware-accelerated azimuth sweep (azscanwxp).
Moves to start_az, then sweeps span degrees while measuring
RSSI/Lock/SNR at each step. The firmware handles motor movement
and measurement atomically no per-point serial round-trips.
Returns a list of measurement points, each containing:
az (degrees), rssi (ADC counts), lock (0/1), snr (dB).
SAFETY: Requires homed motors. Do NOT run on uncalibrated axes.
Args:
start_az: Starting azimuth in degrees.
span: Total sweep width in degrees.
step_cdeg: Step size in centidegrees (100 = 1.00 deg).
num_xponders: Transponders to cycle per position.
timeout: Max wait time for the sweep to complete.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
points = state.device.az_sweep_firmware(
start_az, span, step_cdeg, num_xponders, timeout
)
return {"points": points, "count": len(points)}

View File

@ -0,0 +1,178 @@
"""System and firmware tools — identification, motor dynamics, A3981, NVS."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def _require_device(state: BirdcageState):
if state.device is None:
raise RuntimeError("No device connected. Use the 'connect' tool first.")
def register(mcp):
"""Register system tools on the FastMCP instance."""
@mcp.tool()
async def get_firmware_id(ctx: Context) -> dict:
"""Read full MCU and firmware identification.
Returns NVS version, system ID, silicon info, board ID, antenna
ID, software version, clock frequencies, and flash layout.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_firmware_id()
return {"raw": raw}
@mcp.tool()
async def get_motor_dynamics(ctx: Context) -> dict:
"""Read max velocity and acceleration for both motor axes.
Returns {"az_max_vel", "el_max_vel", "az_accel", "el_accel"}
in degrees/sec and degrees/sec^2 respectively.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_motor_dynamics()
@mcp.tool()
async def set_max_velocity(
ctx: Context,
motor_id: int,
deg_per_sec: float,
) -> dict:
"""Set the maximum velocity for a motor axis.
Args:
motor_id: 0 for azimuth, 1 for elevation.
deg_per_sec: Maximum velocity in degrees per second.
Carryout G2 defaults: AZ=65, EL=45.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.set_max_velocity(motor_id, deg_per_sec)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "set", "axis": axis, "max_velocity": deg_per_sec}
@mcp.tool()
async def set_max_acceleration(
ctx: Context,
motor_id: int,
accel: float,
) -> dict:
"""Set the maximum acceleration for a motor axis.
Args:
motor_id: 0 for azimuth, 1 for elevation.
accel: Maximum acceleration in degrees/sec^2.
Carryout G2 default: 400 for both axes.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.set_max_acceleration(motor_id, accel)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "set", "axis": axis, "max_acceleration": accel}
@mcp.tool()
async def get_motor_life(ctx: Context) -> dict:
"""Read motor lifetime and usage statistics.
Returns total moves, total degrees traveled, and uptime hours
for both AZ and EL axes.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_motor_life()
return {"raw": raw}
@mcp.tool()
async def get_pid_gains(ctx: Context) -> dict:
"""Read PID controller gains for both motor axes.
Returns {"az": {"kp", "kv", "ki"}, "el": {"kp", "kv", "ki"}}.
Carryout G2 defaults: AZ Kp=600/Kv=60/Ki=1, EL Kp=250/Kv=50/Ki=1.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_pid_gains()
@mcp.tool()
async def set_pid_gains(
ctx: Context,
motor_id: int,
kp: float,
kv: float,
ki: float,
) -> dict:
"""Set PID controller gains for a motor axis.
CAUTION: Incorrect PID values can cause oscillation or motor damage.
Args:
motor_id: 0 for azimuth, 1 for elevation.
kp: Proportional gain.
kv: Velocity (derivative) gain.
ki: Integral gain.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.set_pid_gains(motor_id, kp, kv, ki)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "set", "axis": axis, "kp": kp, "kv": kv, "ki": ki}
@mcp.tool()
async def get_a3981_diag(ctx: Context) -> dict:
"""Read Allegro A3981 stepper driver diagnostic status.
Returns fault status for both AZ and EL motor drivers.
Normal response is "AZ DIAG: OK / EL DIAG: OK".
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_a3981_diag()
return {"raw": raw}
@mcp.tool()
async def get_a3981_modes(ctx: Context) -> dict:
"""Read A3981 stepper driver operating modes.
Returns step mode (AUTO/fixed), current mode (AUTO/HiZ/LoZ),
and step size (FULL=16 through SIXTEENTH=1) for both drivers.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_a3981_modes()
@mcp.tool()
async def nvs_dump(ctx: Context) -> dict:
"""Dump all non-volatile storage (NVS) values.
Returns the complete NVS table with index, name, current value,
saved value, and default value for every setting.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.nvs_dump()
return {"raw": raw}
@mcp.tool()
async def nvs_read(ctx: Context, index: int) -> dict:
"""Read a single NVS value by index.
Key indices: 20=Disable Tracker, 80=AZ Max Vel, 85=EL Max Vel,
101=Min EL, 102=Max EL, 128-133=PID gains.
Args:
index: NVS index number (0-143).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.nvs_read(index)
return {"index": index, "raw": raw}

74
mcp/tests/conftest.py Normal file
View File

@ -0,0 +1,74 @@
"""Shared fixtures for birdcage-mcp tests.
Uses FastMCP's run_server_async to spin up a real MCP server backed by
DemoDevice + DemoCraftClient. No serial hardware, no subprocesses.
"""
import json
from contextlib import asynccontextmanager
import pytest
from birdcage.demo import DemoCraftClient, DemoDevice
from fastmcp import Client, FastMCP
from fastmcp.client.transports import StreamableHttpTransport
from fastmcp.utilities.tests import run_server_async
from birdcage_mcp.state import BirdcageState
@asynccontextmanager
async def _test_lifespan(server: FastMCP):
"""Test lifespan that pre-connects a DemoDevice."""
device = DemoDevice()
device.connect()
device.initialize()
state = BirdcageState(
device=device,
craft_client=DemoCraftClient(),
demo_mode=True,
serial_port="/dev/demo",
firmware_name="g2",
connected=True,
)
yield state
def _build_server() -> FastMCP:
"""Build a FastMCP server with all tools registered."""
from birdcage_mcp import prompts, resources
from birdcage_mcp.tools import (
connection,
console,
movement,
satellite,
signal,
system,
)
mcp = FastMCP("birdcage-test", lifespan=_test_lifespan)
connection.register(mcp)
movement.register(mcp)
signal.register(mcp)
system.register(mcp)
satellite.register(mcp)
console.register(mcp)
resources.register(mcp)
prompts.register(mcp)
return mcp
@pytest.fixture
async def mcp_client():
"""Yield a connected MCP client backed by DemoDevice."""
server = _build_server()
async with (
run_server_async(server) as url,
Client(StreamableHttpTransport(url)) as client,
):
yield client
def parse_result(result) -> dict:
"""Extract the dict from a tool call result."""
return json.loads(result.content[0].text)

View File

@ -0,0 +1,29 @@
"""Tests for connection tools (connect, disconnect, status)."""
import pytest
from conftest import parse_result
from fastmcp import Client
@pytest.mark.anyio
async def test_status_shows_connected(mcp_client: Client):
result = await mcp_client.call_tool("status", {})
data = parse_result(result)
assert data["connected"] is True
assert data["demo_mode"] is True
assert data["firmware"] == "g2"
@pytest.mark.anyio
async def test_connect_demo_noop(mcp_client: Client):
result = await mcp_client.call_tool("connect", {})
data = parse_result(result)
assert data["mode"] == "demo"
assert data["status"] == "connected"
@pytest.mark.anyio
async def test_disconnect_demo_noop(mcp_client: Client):
result = await mcp_client.call_tool("disconnect", {})
data = parse_result(result)
assert data["status"] == "demo"

56
mcp/tests/test_console.py Normal file
View File

@ -0,0 +1,56 @@
"""Tests for raw console tool (safety-gated)."""
import pytest
from conftest import parse_result
from fastmcp import Client
@pytest.mark.anyio
async def test_send_raw_command(mcp_client: Client):
result = await mcp_client.call_tool(
"send_raw_command", {"command": "?"}
)
data = parse_result(result)
assert "response" in data
assert "Available commands" in data["response"]
@pytest.mark.anyio
async def test_q_command_blocked(mcp_client: Client):
result = await mcp_client.call_tool(
"send_raw_command", {"command": "q"}
)
data = parse_result(result)
assert data["blocked"] is True
assert "power cycle" in data["error"]
@pytest.mark.anyio
async def test_q_command_blocked_with_whitespace(mcp_client: Client):
result = await mcp_client.call_tool(
"send_raw_command", {"command": " Q "}
)
data = parse_result(result)
assert data["blocked"] is True
@pytest.mark.anyio
async def test_submenu_navigation(mcp_client: Client):
result = await mcp_client.call_tool(
"send_raw_command", {"command": "mot"}
)
data = parse_result(result)
assert "MOT>" in data["response"]
@pytest.mark.anyio
async def test_raw_motor_query(mcp_client: Client):
# Enter mot submenu first
await mcp_client.call_tool("send_raw_command", {"command": "mot"})
# Query position
result = await mcp_client.call_tool(
"send_raw_command", {"command": "a"}
)
data = parse_result(result)
assert "Angle[0]" in data["response"]
assert "Angle[1]" in data["response"]

100
mcp/tests/test_movement.py Normal file
View File

@ -0,0 +1,100 @@
"""Tests for movement tools."""
import pytest
from conftest import parse_result
from fastmcp import Client
from fastmcp.exceptions import ToolError
@pytest.mark.anyio
async def test_get_position(mcp_client: Client):
result = await mcp_client.call_tool("get_position", {})
data = parse_result(result)
assert "azimuth" in data
assert "elevation" in data
assert isinstance(data["azimuth"], float)
assert isinstance(data["elevation"], float)
@pytest.mark.anyio
async def test_move_to(mcp_client: Client):
result = await mcp_client.call_tool(
"move_to", {"azimuth": 200.0, "elevation": 40.0}
)
data = parse_result(result)
assert data["status"] == "moving"
assert data["target_azimuth"] == 200.0
assert data["target_elevation"] == 40.0
@pytest.mark.anyio
async def test_move_motor_az(mcp_client: Client):
result = await mcp_client.call_tool(
"move_motor", {"motor_id": 0, "degrees": 90.0}
)
data = parse_result(result)
assert data["axis"] == "azimuth"
assert data["target"] == 90.0
@pytest.mark.anyio
async def test_move_motor_el(mcp_client: Client):
result = await mcp_client.call_tool(
"move_motor", {"motor_id": 1, "degrees": 30.0}
)
data = parse_result(result)
assert data["axis"] == "elevation"
@pytest.mark.anyio
async def test_move_motor_invalid_id(mcp_client: Client):
with pytest.raises(ToolError):
await mcp_client.call_tool(
"move_motor", {"motor_id": 2, "degrees": 10.0}
)
@pytest.mark.anyio
async def test_home_motor(mcp_client: Client):
result = await mcp_client.call_tool("home_motor", {"motor_id": 1})
data = parse_result(result)
assert data["status"] == "homing"
assert data["axis"] == "elevation"
@pytest.mark.anyio
async def test_engage_release(mcp_client: Client):
result = await mcp_client.call_tool("release_motors", {})
data = parse_result(result)
assert data["status"] == "released"
result = await mcp_client.call_tool("engage_motors", {})
data = parse_result(result)
assert data["status"] == "engaged"
@pytest.mark.anyio
async def test_stow(mcp_client: Client):
result = await mcp_client.call_tool("stow", {})
data = parse_result(result)
assert data["status"] == "stowing"
assert data["target_azimuth"] == 0.0
assert data["target_elevation"] == 65.0
@pytest.mark.anyio
async def test_get_step_positions(mcp_client: Client):
result = await mcp_client.call_tool("get_step_positions", {})
data = parse_result(result)
assert "az_steps" in data
assert "el_steps" in data
assert isinstance(data["az_steps"], int)
@pytest.mark.anyio
async def test_get_el_limits(mcp_client: Client):
result = await mcp_client.call_tool("get_el_limits", {})
data = parse_result(result)
assert data["min"] == 18.0
assert data["max"] == 65.0
assert data["home"] == 65.0

View File

@ -0,0 +1,86 @@
"""Tests for satellite tracking tools."""
import pytest
from conftest import parse_result
from fastmcp import Client
@pytest.mark.anyio
async def test_search_satellites(mcp_client: Client):
result = await mcp_client.call_tool(
"search_satellites", {"query": "ISS"}
)
data = parse_result(result)
assert data["count"] >= 1
names = [r["name"] for r in data["results"]]
assert any("ISS" in n for n in names)
@pytest.mark.anyio
async def test_search_no_results(mcp_client: Client):
result = await mcp_client.call_tool(
"search_satellites", {"query": "nonexistent_xyz"}
)
data = parse_result(result)
assert data["count"] == 0
@pytest.mark.anyio
async def test_search_with_limit(mcp_client: Client):
result = await mcp_client.call_tool(
"search_satellites", {"query": "", "limit": 3}
)
data = parse_result(result)
# Empty query won't match any catalog entries
assert data["count"] >= 0
@pytest.mark.anyio
async def test_get_passes(mcp_client: Client):
result = await mcp_client.call_tool(
"get_passes", {"norad_id": 25544}
)
data = parse_result(result)
assert data["count"] > 0
p = data["passes"][0]
assert "aos_time" in p
assert "tca_time" in p
assert "los_time" in p
assert "max_elevation" in p
assert p["norad_id"] == 25544
@pytest.mark.anyio
async def test_get_next_pass(mcp_client: Client):
result = await mcp_client.call_tool(
"get_next_pass", {"norad_id": 25544}
)
data = parse_result(result)
assert data["pass"] is not None
assert data["pass"]["satellite_name"] == "ISS (ZARYA)"
@pytest.mark.anyio
async def test_get_visible_targets(mcp_client: Client):
result = await mcp_client.call_tool(
"get_visible_targets", {}
)
data = parse_result(result)
# Celestial bodies (Moon, Sun, Jupiter) are always visible
assert data["count"] >= 1
names = [t["name"] for t in data["targets"]]
# At least celestial bodies should be present
assert any(
n in names for n in ("Moon", "Sun", "Jupiter")
)
@pytest.mark.anyio
async def test_get_visible_targets_with_min_alt(mcp_client: Client):
result = await mcp_client.call_tool(
"get_visible_targets", {"min_alt": 80.0}
)
data = parse_result(result)
# Very high min_alt should filter out most targets
for t in data["targets"]:
assert t["altitude"] >= 80.0

112
mcp/tests/test_signal.py Normal file
View File

@ -0,0 +1,112 @@
"""Tests for signal measurement tools."""
import pytest
from conftest import parse_result
from fastmcp import Client
from fastmcp.exceptions import ToolError
@pytest.mark.anyio
async def test_get_rssi_default(mcp_client: Client):
result = await mcp_client.call_tool("get_rssi", {})
data = parse_result(result)
assert data["reads"] == 10
assert isinstance(data["average"], int)
assert isinstance(data["current"], int)
@pytest.mark.anyio
async def test_get_rssi_custom_iterations(mcp_client: Client):
result = await mcp_client.call_tool("get_rssi", {"iterations": 5})
data = parse_result(result)
assert data["reads"] == 5
@pytest.mark.anyio
async def test_get_adc_rssi(mcp_client: Client):
result = await mcp_client.call_tool("get_adc_rssi", {})
data = parse_result(result)
assert "raw" in data
@pytest.mark.anyio
async def test_get_lock_status(mcp_client: Client):
result = await mcp_client.call_tool("get_lock_status", {})
data = parse_result(result)
assert "raw" in data
assert "Lock:" in data["raw"]
@pytest.mark.anyio
async def test_enable_lna(mcp_client: Client):
result = await mcp_client.call_tool("enable_lna", {})
data = parse_result(result)
assert data["status"] == "lna_enabled"
assert data["voltage"] == "13V"
assert data["polarity"] == "V"
@pytest.mark.anyio
async def test_set_lnb_voltage_odu(mcp_client: Client):
result = await mcp_client.call_tool(
"set_lnb_voltage", {"mode": "odu"}
)
data = parse_result(result)
assert data["voltage"] == "13V"
assert data["polarity"] == "V"
assert data["mode"] == "odu"
@pytest.mark.anyio
async def test_set_lnb_voltage_stb(mcp_client: Client):
result = await mcp_client.call_tool(
"set_lnb_voltage", {"mode": "stb"}
)
data = parse_result(result)
assert data["voltage"] == "18V"
assert data["polarity"] == "H"
assert data["mode"] == "stb"
@pytest.mark.anyio
async def test_set_lnb_voltage_invalid(mcp_client: Client):
with pytest.raises(ToolError):
await mcp_client.call_tool(
"set_lnb_voltage", {"mode": "invalid"}
)
@pytest.mark.anyio
async def test_get_dvb_config(mcp_client: Client):
result = await mcp_client.call_tool("get_dvb_config", {})
data = parse_result(result)
assert "BCM" in data["raw"]
assert "0x4515" in data["raw"]
@pytest.mark.anyio
async def test_get_channel_params(mcp_client: Client):
result = await mcp_client.call_tool("get_channel_params", {})
data = parse_result(result)
assert "Frequency" in data["raw"]
@pytest.mark.anyio
async def test_az_sweep(mcp_client: Client):
result = await mcp_client.call_tool(
"az_sweep",
{
"start_az": 195.0,
"span": 10.0,
"step_cdeg": 200,
"num_xponders": 1,
},
)
data = parse_result(result)
assert data["count"] > 0
assert len(data["points"]) == data["count"]
pt = data["points"][0]
assert "az" in pt
assert "rssi" in pt
assert "lock" in pt
assert "snr" in pt

121
mcp/tests/test_system.py Normal file
View File

@ -0,0 +1,121 @@
"""Tests for system and firmware tools."""
import pytest
from conftest import parse_result
from fastmcp import Client
from fastmcp.exceptions import ToolError
@pytest.mark.anyio
async def test_get_firmware_id(mcp_client: Client):
result = await mcp_client.call_tool("get_firmware_id", {})
data = parse_result(result)
assert "02.02.48" in data["raw"]
assert "TWELINCH" in data["raw"]
@pytest.mark.anyio
async def test_get_motor_dynamics(mcp_client: Client):
result = await mcp_client.call_tool("get_motor_dynamics", {})
data = parse_result(result)
assert data["az_max_vel"] == 65.0
assert data["el_max_vel"] == 45.0
assert data["az_accel"] == 400.0
assert data["el_accel"] == 400.0
@pytest.mark.anyio
async def test_set_max_velocity(mcp_client: Client):
result = await mcp_client.call_tool(
"set_max_velocity", {"motor_id": 0, "deg_per_sec": 30.0}
)
data = parse_result(result)
assert data["axis"] == "azimuth"
assert data["max_velocity"] == 30.0
# Verify it stuck
result = await mcp_client.call_tool("get_motor_dynamics", {})
data = parse_result(result)
assert data["az_max_vel"] == 30.0
@pytest.mark.anyio
async def test_set_max_velocity_invalid_motor(mcp_client: Client):
with pytest.raises(ToolError):
await mcp_client.call_tool(
"set_max_velocity", {"motor_id": 3, "deg_per_sec": 10.0}
)
@pytest.mark.anyio
async def test_set_max_acceleration(mcp_client: Client):
result = await mcp_client.call_tool(
"set_max_acceleration", {"motor_id": 1, "accel": 200.0}
)
data = parse_result(result)
assert data["axis"] == "elevation"
assert data["max_acceleration"] == 200.0
@pytest.mark.anyio
async def test_get_motor_life(mcp_client: Client):
result = await mcp_client.call_tool("get_motor_life", {})
data = parse_result(result)
assert "AZ total moves" in data["raw"]
@pytest.mark.anyio
async def test_get_pid_gains(mcp_client: Client):
result = await mcp_client.call_tool("get_pid_gains", {})
data = parse_result(result)
assert data["az"]["kp"] == 600.0
assert data["az"]["kv"] == 60.0
assert data["el"]["kp"] == 250.0
@pytest.mark.anyio
async def test_set_pid_gains(mcp_client: Client):
result = await mcp_client.call_tool(
"set_pid_gains",
{"motor_id": 0, "kp": 500.0, "kv": 55.0, "ki": 2.0},
)
data = parse_result(result)
assert data["status"] == "set"
assert data["kp"] == 500.0
@pytest.mark.anyio
async def test_get_a3981_diag(mcp_client: Client):
result = await mcp_client.call_tool("get_a3981_diag", {})
data = parse_result(result)
assert "OK" in data["raw"]
@pytest.mark.anyio
async def test_get_a3981_modes(mcp_client: Client):
result = await mcp_client.call_tool("get_a3981_modes", {})
data = parse_result(result)
assert "AUTO" in data["step_mode"]
@pytest.mark.anyio
async def test_nvs_dump(mcp_client: Client):
result = await mcp_client.call_tool("nvs_dump", {})
data = parse_result(result)
assert "Disable Tracker" in data["raw"]
assert "AZ Max Vel" in data["raw"]
@pytest.mark.anyio
async def test_nvs_read(mcp_client: Client):
result = await mcp_client.call_tool("nvs_read", {"index": 20})
data = parse_result(result)
assert data["index"] == 20
assert "Disable Tracker" in data["raw"]
@pytest.mark.anyio
async def test_nvs_read_missing_index(mcp_client: Client):
result = await mcp_client.call_tool("nvs_read", {"index": 999})
data = parse_result(result)
assert "not found" in data["raw"]

1678
mcp/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,15 @@
"""birdcage: Winegard satellite dish control for amateur radio sky tracking."""
from birdcage.antenna import AntennaConfig, BirdcageAntenna
from birdcage.bridge import SerialBridge
from birdcage.craft_client import (
CraftClient,
CraftTrackingState,
PassPrediction,
SearchResult,
TargetPosition,
)
from birdcage.demo import DemoCraftClient, DemoDevice
from birdcage.leapfrog import apply_leapfrog
from birdcage.protocol import (
CarryoutG2Protocol,
@ -16,11 +25,19 @@ __all__ = [
"AntennaConfig",
"BirdcageAntenna",
"CarryoutG2Protocol",
"CraftClient",
"CraftTrackingState",
"DemoCraftClient",
"DemoDevice",
"FirmwareProtocol",
"HAL000Protocol",
"HAL205Protocol",
"PassPrediction",
"Position",
"RssiReading",
"RotctldServer",
"SearchResult",
"SerialBridge",
"TargetPosition",
"apply_leapfrog",
]

View File

@ -1,6 +1,6 @@
"""Thread-safe bridge between Birdcage TUI and CarryoutG2Protocol.
"""Thread-safe bridge between consumers and CarryoutG2Protocol.
Wraps all serial I/O in a threading.Lock so the TUI's worker threads
Wraps all serial I/O in a threading.Lock so concurrent callers
don't stomp on each other. Tracks the current firmware submenu to
minimize unnecessary q-then-reenter transitions.
"""
@ -53,7 +53,7 @@ _MENU_COMMANDS: dict[Menu, str] = {
class SerialBridge:
"""Thread-safe wrapper around CarryoutG2Protocol for TUI consumption.
"""Thread-safe wrapper around CarryoutG2Protocol.
All public methods acquire a lock before touching the serial port.
The bridge tracks the current firmware submenu so it can skip
@ -68,7 +68,7 @@ class SerialBridge:
self._connected = False
# ------------------------------------------------------------------
# Menu prompt string mapping for status display
# Menu prompt -> string mapping for status display
# ------------------------------------------------------------------
_MENU_PROMPTS: dict[Menu, str] = {
@ -297,7 +297,7 @@ class SerialBridge:
"el_accel": 0.0,
}
# mv "Max Vel [0] = 65.0 Max Vel [1] = 45.0" or similar
# mv -> "Max Vel [0] = 65.0 Max Vel [1] = 45.0" or similar
vel_matches = re.findall(r"Max Vel\s*\[(\d)\]\s*=\s*(-?\d+\.?\d*)", mv_resp)
for motor_id, val in vel_matches:
if motor_id == "0":
@ -305,7 +305,7 @@ class SerialBridge:
elif motor_id == "1":
result["el_max_vel"] = float(val)
# ma "Accel[0] = 400.0 Accel[1] = 400.0"
# ma -> "Accel[0] = 400.0 Accel[1] = 400.0"
acc_matches = re.findall(r"Accel\[(\d)\]\s*=\s*(-?\d+\.?\d*)", ma_resp)
for motor_id, val in acc_matches:
if motor_id == "0":
@ -316,7 +316,7 @@ class SerialBridge:
return result
def set_max_velocity(self, motor_id: int, deg_per_sec: float) -> None:
"""Set max velocity for a motor axis (°/s). Firmware: MOT> mv [motor] [vel]."""
"""Set max velocity for a motor axis (deg/s)."""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"mv {motor_id} {deg_per_sec:.1f}")
@ -324,7 +324,7 @@ class SerialBridge:
def set_max_acceleration(self, motor_id: int, accel: float) -> None:
"""Set max acceleration for a motor axis.
Firmware: MOT> ma [motor] [accel] (°/).
Firmware: MOT> ma [motor] [accel] (deg/s^2).
"""
with self._lock:
self._ensure_menu(Menu.MOT)
@ -460,7 +460,7 @@ class SerialBridge:
Args:
start_az: Starting azimuth in degrees.
span: Total sweep width in degrees.
step_cdeg: Step size in centidegrees (100 = 1.00°).
step_cdeg: Step size in centidegrees (100 = 1.00 deg).
num_xponders: Number of transponders to cycle per position.
timeout: Serial read timeout for the long-running command.
@ -526,10 +526,26 @@ class SerialBridge:
raise ValueError(f"Could not parse RSSI: {response!r}")
def enable_lna(self) -> None:
"""Enable LNA in ODU mode (sets LNB to 13V)."""
"""Enable LNA in ODU mode (13V). Alias for set_lnb_voltage('odu')."""
self.set_lnb_voltage("odu")
def set_lnb_voltage(self, mode: str) -> str:
"""Set LNB DC voltage mode.
Args:
mode: 'odu' for 13V (V-pol, LNA enabled) or 'stb' for 18V (H-pol).
Returns:
Raw firmware response.
"""
mode = mode.strip().lower()
if mode not in ("odu", "stb"):
raise ValueError(
f"Invalid LNB mode {mode!r}: use 'odu' (13V) or 'stb' (18V)"
)
with self._lock:
self._ensure_menu(Menu.DVB)
self._send("lnbdc odu")
return self._send(f"lnbdc {mode}")
def get_lock_status(self) -> str:
"""Read quick lock status (single-shot)."""

View File

@ -2,7 +2,7 @@
Provides satellite search, pass predictions, and real-time sky positions
from the Craft orbital mechanics API. All methods are blocking designed
to be called from @work(thread=True) workers in the TUI.
to be called from worker threads.
Uses urllib.request + json only (no requests/httpx dependency).
"""

View File

@ -1,16 +1,23 @@
"""Synthetic demo device for the Birdcage TUI.
"""Synthetic demo device for Birdcage.
Drop-in replacement for SerialBridge that simulates a Winegard Carryout G2
dish with motor movement, RSSI signal modeling, and canned firmware responses.
No serial hardware required.
Also provides DemoCraftClient a duck-typed replacement for CraftClient that
returns synthetic satellite data with zero HTTP calls. Supports time-varying
LEO arcs so the tracking loop drives real pass events to the camera overlay.
"""
import contextlib
import datetime as _dt
import math
import random
import time
from enum import Enum, auto
from birdcage.craft_client import PassPrediction, SearchResult, TargetPosition
class _DemoMenu(Enum):
"""Simulated firmware submenu states."""
@ -212,6 +219,9 @@ class DemoDevice:
self._az_accel = 400.0
self._el_accel = 400.0
# LNB state (boot default is 18V / H-pol / STB mode).
self._lnb_mode = "stb"
# Submenu tracking for console simulation.
self._menu = _DemoMenu.ROOT
@ -375,6 +385,10 @@ class DemoDevice:
timeout: float = 120,
) -> list[dict[str, float]]:
"""Simulate a firmware azscanwxp sweep with Gaussian signal peak."""
# Snap EL to target so 2D scans compute correct per-row signal.
# Without this, move_motor(1, el) only sets _target_el — _el stays
# stale because the position interpolator never runs mid-sweep.
self._el = self._target_el
step_deg = step_cdeg / 100.0
if step_deg <= 0:
step_deg = 1.0
@ -418,7 +432,18 @@ class DemoDevice:
}
def enable_lna(self) -> None:
pass # No-op in demo mode.
self._lnb_mode = "odu"
def set_lnb_voltage(self, mode: str) -> str:
mode = mode.strip().lower()
if mode not in ("odu", "stb"):
raise ValueError(
f"Invalid LNB mode {mode!r}: use 'odu' (13V) or 'stb' (18V)"
)
self._lnb_mode = mode
if mode == "odu":
return "Enabled LNB ODU"
return "Enabled LNB STB"
def get_lock_status(self) -> str:
rssi = int(self._compute_rssi())
@ -724,3 +749,216 @@ class DemoDevice:
if cmd == "reboot":
return "Rebooting...\nApplication Starting Kinetis PCB...\nTRK>"
return f"Unknown command: {cmd}\nOS>"
# ------------------------------------------------------------------
# DemoCraftClient — offline Craft API replacement
# ------------------------------------------------------------------
# Canned satellite catalog for search results.
_DEMO_CATALOG: list[dict] = [
{
"name": "ISS (ZARYA)",
"type": "satellite",
"id": "25544",
"groups": ["stations"],
},
{
"name": "NOAA 19",
"type": "satellite",
"id": "33591",
"groups": ["weather"],
},
{
"name": "SO-50 (SAUDISAT 1C)",
"type": "satellite",
"id": "27607",
"groups": ["amateur"],
},
{
"name": "TEVEL-2",
"type": "satellite",
"id": "50988",
"groups": ["amateur"],
},
{
"name": "AO-91 (FOX-1B)",
"type": "satellite",
"id": "43017",
"groups": ["amateur"],
},
{
"name": "Moon",
"type": "celestial",
"id": "moon",
"groups": ["solar-system"],
},
{
"name": "Sun",
"type": "celestial",
"id": "sun",
"groups": ["solar-system"],
},
{
"name": "Jupiter",
"type": "celestial",
"id": "jupiter",
"groups": ["solar-system"],
},
]
# LEO arc parameters: (phase_offset_minutes, period_minutes, max_el)
_LEO_ARCS: dict[str, tuple[float, float, float]] = {
"25544": (0.0, 10.0, 55.0), # ISS — primary demo target
"33591": (3.0, 9.0, 42.0), # NOAA 19
"27607": (5.0, 8.5, 38.0), # SO-50
"50988": (7.0, 11.0, 48.0), # TEVEL-2
"43017": (2.0, 9.5, 35.0), # AO-91
}
def _leo_position(target_id: str, t: float) -> tuple[float, float]:
"""Compute time-varying AZ/EL for a simulated LEO satellite.
The arc traces: rise east (AZ ~90, EL 0) -> TCA (~180, max_el)
-> set west (AZ ~270, EL 0) over one period, then resets.
Returns (az, el) where el < 0 means below horizon.
"""
offset, period, max_el = _LEO_ARCS.get(target_id, (0.0, 10.0, 40.0))
period_sec = period * 60.0
phase = ((t + offset * 60.0) % period_sec) / period_sec # 0.0 -> 1.0
# Visible window: phase 0.0-0.5 = above horizon, 0.5-1.0 = below
if phase > 0.5:
return 0.0, -10.0 # Below horizon
# Map 0->0.5 to AZ 90->270, EL 0->max->0 (sine arc)
arc_phase = phase / 0.5 # 0.0 -> 1.0 through the visible pass
az = 90.0 + 180.0 * arc_phase
el = max_el * math.sin(math.pi * arc_phase)
return az, el
def _celestial_position(target_id: str, t: float) -> tuple[float, float]:
"""Slow-drift positions for celestial bodies."""
if target_id == "moon":
az = 145.0 + 0.5 * math.sin(t / 600.0) * (t / 60.0 % 10)
el = 32.0 + 3.0 * math.sin(t / 900.0)
return az, max(el, 5.0)
if target_id == "sun":
az = 210.0 + 0.3 * (t / 60.0 % 15)
el = 45.0 + 5.0 * math.sin(t / 1200.0)
return az, max(el, 10.0)
# Jupiter — nearly fixed
return 255.0, 28.0
class DemoCraftClient:
"""Offline replacement for CraftClient returning synthetic orbital data.
Duck-typed to match CraftClient's interface. No HTTP calls are made.
LEO satellites trace realistic arcs using time.monotonic() so the
tracking loop sees genuine AOS/TCA/LOS transitions.
"""
def __init__(self) -> None:
self._t0 = time.monotonic()
def health(self) -> bool:
return True
def search(self, query: str, limit: int = 20) -> list[SearchResult]:
q_lower = query.lower()
results = []
for entry in _DEMO_CATALOG:
if q_lower in entry["name"].lower():
results.append(
SearchResult(
name=entry["name"],
target_type=entry["type"],
target_id=entry["id"],
score=1.0,
groups=entry["groups"],
)
)
if len(results) >= limit:
break
return results
def get_passes(self, norad_id: int, hours: int = 24) -> list[PassPrediction]:
now = _dt.datetime.now(tz=_dt.UTC)
name = "Unknown"
for entry in _DEMO_CATALOG:
if entry["id"] == str(norad_id):
name = entry["name"]
break
arc_params = _LEO_ARCS.get(str(norad_id), (0.0, 10.0, 40.0))
_, period, max_el = arc_params
passes = []
for i in range(4):
aos = now + _dt.timedelta(minutes=30 * (i + 1))
tca = aos + _dt.timedelta(minutes=period / 2)
los = aos + _dt.timedelta(minutes=period)
duration = int(period * 60)
passes.append(
PassPrediction(
satellite_name=name,
norad_id=norad_id,
aos_time=aos.strftime("%Y-%m-%dT%H:%M:%S"),
aos_az=90.0 + random.uniform(-10, 10),
tca_time=tca.strftime("%Y-%m-%dT%H:%M:%S"),
tca_alt=max_el,
tca_az=180.0 + random.uniform(-15, 15),
los_time=los.strftime("%Y-%m-%dT%H:%M:%S"),
los_az=270.0 + random.uniform(-10, 10),
max_elevation=max_el,
duration_seconds=duration,
is_visible=i < 2,
)
)
return passes
def get_next_pass(self, norad_id: int) -> PassPrediction | None:
passes = self.get_passes(norad_id)
return passes[0] if passes else None
def get_visible_targets(self, min_alt: float = 0.0) -> list[TargetPosition]:
t = time.monotonic() - self._t0
targets = []
for entry in _DEMO_CATALOG:
tid = entry["id"]
ttype = entry["type"]
if ttype == "satellite" and tid in _LEO_ARCS:
az, el = _leo_position(tid, t)
elif ttype == "celestial":
az, el = _celestial_position(tid, t)
else:
continue
if el < min_alt:
continue
# Synthetic distance/range-rate for LEO targets
if ttype == "satellite":
dist = 400.0 + 200.0 * math.cos(math.pi * el / 90.0)
rr = -2.0 + 4.0 * math.sin(t / 120.0)
else:
dist = 384400.0 if tid == "moon" else 0.0
rr = 0.0
targets.append(
TargetPosition(
name=entry["name"],
target_type=ttype,
target_id=tid,
azimuth=round(az, 2),
altitude=round(el, 2),
distance_km=round(dist, 1),
range_rate=round(rr, 3),
)
)
return targets

View File

@ -0,0 +1,160 @@
"""Capture all TUI screenshots for documentation.
Runs the Birdcage TUI in demo mode using Textual's Pilot API,
navigates to each screen/sub-mode, and exports SVG + PNG screenshots.
Usage:
cd tui && uv run python scripts/capture_screenshots.py
Output goes to ../site/public/screenshots/
"""
import asyncio
import subprocess
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from birdcage_tui.app import BirdcageApp
OUTPUT_DIR = (
Path(__file__).resolve().parent.parent.parent / "site" / "public" / "screenshots"
)
TERMINAL_SIZE = (120, 42)
# Demo search results for craft-search screenshot
DEMO_SEARCH_RESULTS = [
{
"name": "ISS (ZARYA)",
"target_type": "satellite",
"target_id": "25544",
"groups": ["stations"],
},
{
"name": "NOAA 19",
"target_type": "satellite",
"target_id": "33591",
"groups": ["weather"],
},
{
"name": "SO-50 (SAUDISAT 1C)",
"target_type": "satellite",
"target_id": "27607",
"groups": ["amateur"],
},
{
"name": "TEVEL-2",
"target_type": "satellite",
"target_id": "50988",
"groups": ["amateur"],
},
{
"name": "AO-91 (FOX-1B)",
"target_type": "satellite",
"target_id": "43017",
"groups": ["amateur"],
},
]
def save(name: str, svg: str) -> None:
"""Write SVG and convert to PNG via rsvg-convert."""
svg_path = OUTPUT_DIR / f"{name}.svg"
png_path = OUTPUT_DIR / f"{name}.png"
svg_path.write_text(svg)
print(f" {svg_path.name}")
try:
subprocess.run(
["rsvg-convert", "-o", str(png_path), str(svg_path)],
check=True,
capture_output=True,
)
print(f" {png_path.name}")
except FileNotFoundError:
print(" WARNING: rsvg-convert not found, skipping PNG")
async def capture_all() -> None:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=TERMINAL_SIZE) as pilot:
await pilot.pause(1.0)
# ---- F1 Dashboard ----
print("F1 Dashboard")
await pilot.press("f1")
await pilot.pause(0.5)
save("tui-dashboard", app.export_screenshot())
# ---- F2 Control (Manual mode) ----
print("F2 Control")
await pilot.press("f2")
await pilot.pause(0.5)
save("tui-control", app.export_screenshot())
# ---- F2 Control > Craft (search results) ----
print("F2 Control > Craft (search)")
craft_mode_btn = app.query_one("#mode-craft")
craft_mode_btn.press()
await pilot.pause(0.5)
# Populate search results directly via widget API
craft_panel = app.query_one("#ctrl-craft-panel")
craft_panel.set_search_results(DEMO_SEARCH_RESULTS)
await pilot.pause(0.3)
save("tui-craft-search", app.export_screenshot())
# ---- F2 Control > Craft (tracking Moon) ----
print("F2 Control > Craft (tracking)")
# Clear results and set tracking state
craft_panel.set_search_results([])
craft_panel.set_tracking_status(
state="TRACKING",
target_name="Moon",
azimuth=145.00,
elevation=32.01,
distance_km=384400,
range_rate=-0.3,
moves=47,
rate=1.0,
)
await pilot.pause(0.3)
save("tui-craft-tracking", app.export_screenshot())
# ---- F3 Signal (Monitor mode) ----
print("F3 Signal")
await pilot.press("f3")
await pilot.pause(0.5)
save("tui-signal", app.export_screenshot())
# ---- F4 System ----
print("F4 System")
await pilot.press("f4")
await pilot.pause(0.5)
save("tui-system", app.export_screenshot())
# ---- F5 Console overlay ----
print("F5 Console")
await pilot.press("f5")
await pilot.pause(0.5)
save("tui-console", app.export_screenshot())
await pilot.press("f5")
await pilot.pause(0.3)
# ---- F6 Camera overlay ----
print("F6 Camera")
await pilot.press("f6")
await pilot.pause(0.5)
save("tui-camera", app.export_screenshot())
await pilot.press("f6")
await pilot.pause(0.3)
print("\nDone!")
if __name__ == "__main__":
asyncio.run(capture_all())

View File

@ -1,21 +1,31 @@
"""Take screenshots of all TUI screens in demo mode for documentation."""
"""Take screenshots of all TUI screens in demo mode for documentation.
Generates SVG snapshots of every screen and sub-mode including overlays.
All data comes from DemoDevice and DemoCraftClient no hardware or
network required.
Usage:
cd tui && uv run python scripts/take_screenshots.py
"""
import asyncio
from birdcage_tui.app import BirdcageApp
SCREENS = {
"f1": "dashboard",
"f2": "control",
"f3": "signal",
"f4": "system",
}
from birdcage_tui.screens.control import ControlScreen
from birdcage_tui.widgets.craft_panel import CraftPanel
OUT_DIR = "/home/rpm/claude/ham/satellite/winegard-travler/site/public/screenshots"
async def main():
for key, name in SCREENS.items():
async def _screenshot_basic_tabs():
"""F1 Dashboard, F2 Control (manual), F3 Signal, F4 System."""
tabs = {
"f1": "dashboard",
"f2": "control",
"f3": "signal",
"f4": "system",
}
for key, name in tabs.items():
app = BirdcageApp()
app.demo_mode = True
@ -23,7 +33,6 @@ async def main():
await pilot.pause()
await pilot.press(key)
await pilot.pause()
# Let workers populate data.
await asyncio.sleep(1.5)
path = f"{OUT_DIR}/tui-{name}.svg"
@ -31,4 +40,107 @@ async def main():
print(f"Saved {path}")
async def _screenshot_craft_search():
"""F2 Control > Craft sub-mode with search results."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await pilot.press("f2")
await pilot.pause()
control = app.query_one("#control", ControlScreen)
control.switch_mode("craft")
await pilot.pause()
# Trigger a search — uses DemoCraftClient
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
panel.post_message(CraftPanel.SearchRequested(""))
await asyncio.sleep(1.0)
path = f"{OUT_DIR}/tui-craft-search.svg"
app.save_screenshot(path)
print(f"Saved {path}")
async def _screenshot_craft_tracking():
"""F2 Control > Craft sub-mode actively tracking the Moon."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await pilot.press("f2")
await pilot.pause()
control = app.query_one("#control", ControlScreen)
control.switch_mode("craft")
await pilot.pause()
# Start tracking the Moon (always above horizon in demo)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
panel.post_message(
CraftPanel.TrackRequested(
target_type="celestial",
target_id="moon",
name="Moon",
min_el=5.0,
)
)
await asyncio.sleep(2.5)
path = f"{OUT_DIR}/tui-craft-tracking.svg"
app.save_screenshot(path)
print(f"Saved {path}")
control._stop_craft_tracking()
await pilot.pause()
async def _screenshot_console():
"""F5 Console overlay."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await asyncio.sleep(1.0)
await pilot.press("f5")
await pilot.pause()
await asyncio.sleep(0.5)
path = f"{OUT_DIR}/tui-console.svg"
app.save_screenshot(path)
print(f"Saved {path}")
async def _screenshot_camera():
"""F6 Camera overlay."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await asyncio.sleep(1.0)
await pilot.press("f6")
await pilot.pause()
await asyncio.sleep(0.5)
path = f"{OUT_DIR}/tui-camera.svg"
app.save_screenshot(path)
print(f"Saved {path}")
async def main():
await _screenshot_basic_tabs()
await _screenshot_craft_search()
await _screenshot_craft_tracking()
await _screenshot_console()
await _screenshot_camera()
print(f"\nAll screenshots saved to {OUT_DIR}/")
asyncio.run(main())

View File

@ -102,15 +102,14 @@ class BirdcageApp(App):
def _setup_device(self) -> None:
"""Create device (demo or real) and hand it to each screen."""
if self.demo_mode:
from birdcage_tui.demo import DemoDevice
from birdcage.demo import DemoDevice
self.device = DemoDevice()
self.device.connect()
else:
from birdcage.bridge import SerialBridge
from birdcage.protocol import get_protocol
from birdcage_tui.bridge import SerialBridge
protocol = get_protocol(self.firmware_name)
self.device = SerialBridge(protocol)
self.device.connect(self.serial_port)
@ -141,9 +140,14 @@ class BirdcageApp(App):
def _setup_craft_client(self) -> None:
"""Create a Craft API client and hand it to the control screen."""
from birdcage_tui.craft_client import CraftClient
if self.demo_mode:
from birdcage.demo import DemoCraftClient
client = CraftClient(base_url=self.craft_url)
client = DemoCraftClient()
else:
from birdcage.craft_client import CraftClient
client = CraftClient(base_url=self.craft_url)
try:
control = self.query_one("#control")
if hasattr(control, "set_craft_client"):

View File

@ -9,6 +9,7 @@ import contextlib
import logging
import threading
from birdcage.craft_client import CraftClient, CraftTrackingState
from textual import work
from textual.app import ComposeResult
from textual.binding import Binding
@ -16,7 +17,6 @@ from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Button, ContentSwitcher, Input, Static
from textual.worker import Worker
from birdcage_tui.craft_client import CraftClient, CraftTrackingState
from birdcage_tui.rotctld_server import TrackingState, TuiRotctldServer
from birdcage_tui.widgets.compass_rose import CompassRose
from birdcage_tui.widgets.craft_panel import CraftPanel

View File

@ -115,7 +115,7 @@ class SignalScreen(Container):
yield Static(" Hz", classes="label")
yield Button("Start", id="btn-start", variant="primary")
yield Button("Stop", id="btn-stop")
yield Button("Enable LNA", id="btn-lna")
yield Button("V-pol 13V", id="btn-lna")
yield Button("Reset Peak", id="btn-reset-peak")
# -- Sweep mode -------------------------------------------
@ -350,9 +350,13 @@ class SignalScreen(Container):
label = "YES" if locked else "NO"
self.query_one("#lock-status", Static).update(f" Lock: {label}")
def _update_lna_label(self) -> None:
label = "ON" if self._lna_enabled else "OFF"
self.query_one("#lna-status", Static).update(f" LNA: {label}")
def _update_lna_ui(self) -> None:
if self._lna_enabled:
self.query_one("#lna-status", Static).update(" LNA: ON (V-pol)")
self.query_one("#btn-lna", Button).label = "H-pol 18V"
else:
self.query_one("#lna-status", Static).update(" LNA: OFF (H-pol)")
self.query_one("#btn-lna", Button).label = "V-pol 13V"
def _update_status_strip_rssi(self, rssi_avg: int) -> None:
"""Push RSSI to the app-level StatusStrip."""
@ -386,19 +390,29 @@ class SignalScreen(Container):
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
self._do_enable_lna()
self._do_toggle_lnb()
@work(thread=True, exclusive=False, group="signal-cmd")
def _do_enable_lna(self) -> None:
def _do_toggle_lnb(self) -> None:
try:
self._device.enable_lna()
self._lna_enabled = True
self.app.call_from_thread(self._update_lna_label)
self.app.call_from_thread(self.app.notify, "LNA enabled (13V ODU)")
if self._lna_enabled:
self._device.set_lnb_voltage("stb")
self._lna_enabled = False
self.app.call_from_thread(self._update_lna_ui)
self.app.call_from_thread(
self.app.notify, "LNB set to 18V (H-pol)"
)
else:
self._device.set_lnb_voltage("odu")
self._lna_enabled = True
self.app.call_from_thread(self._update_lna_ui)
self.app.call_from_thread(
self.app.notify, "LNB set to 13V (V-pol, LNA on)"
)
except Exception:
log.exception("LNA enable failed")
log.exception("LNB voltage switch failed")
self.app.call_from_thread(
self.app.notify, "LNA enable failed", severity="error"
self.app.notify, "LNB voltage switch failed", severity="error"
)
def _handle_reset_peak(self) -> None:

View File

@ -13,7 +13,7 @@ from unittest.mock import MagicMock
import pytest
from birdcage_tui.app import BirdcageApp
from birdcage_tui.craft_client import PassPrediction, SearchResult, TargetPosition
from birdcage.craft_client import PassPrediction, SearchResult, TargetPosition
from birdcage_tui.screens.control import ControlScreen
from birdcage_tui.widgets.craft_panel import CraftPanel, CraftTrackingStatus

View File

@ -0,0 +1,164 @@
"""Test DemoCraftClient — full Craft lifecycle in demo mode.
Unlike test_craft_mode.py which uses MagicMock, these tests run with the
real DemoCraftClient to verify the end-to-end demo experience: search,
pass predictions, and tracking with time-varying satellite positions.
No network calls are made.
"""
import asyncio
import pytest
from birdcage_tui.app import BirdcageApp
from birdcage_tui.screens.control import ControlScreen
from birdcage_tui.widgets.craft_panel import CraftPanel, CraftTrackingStatus
async def _switch_to_craft(pilot, app) -> None:
"""Navigate to F2 Control > Craft sub-mode."""
await pilot.press("f2")
await pilot.pause()
control = app.query_one("#control", ControlScreen)
control.switch_mode("craft")
await pilot.pause()
@pytest.mark.asyncio
async def test_demo_search_returns_results():
"""DemoCraftClient search should populate the DataTable."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
panel.post_message(CraftPanel.SearchRequested("ISS"))
await asyncio.sleep(0.5)
from textual.widgets import DataTable
table = app.query_one("#craft-results-table", DataTable)
assert table.row_count >= 1
first_key = list(table.rows.keys())[0]
row = table.get_row(first_key)
assert "ISS" in row[0]
@pytest.mark.asyncio
async def test_demo_broad_search():
"""Empty query should return the full demo catalog."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
panel.post_message(CraftPanel.SearchRequested(""))
await asyncio.sleep(0.5)
from textual.widgets import DataTable
table = app.query_one("#craft-results-table", DataTable)
assert table.row_count == 8 # Full demo catalog
@pytest.mark.asyncio
async def test_demo_passes():
"""DemoCraftClient should return synthetic pass predictions."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
control = app.query_one("#control", ControlScreen)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
panel.post_message(CraftPanel.PassesRequested(norad_id=25544))
await asyncio.sleep(0.5)
from birdcage_tui.widgets.craft_panel import CraftPassInfo
info = app.query_one("#craft-pass-info", CraftPassInfo)
# Should contain max elevation of 55.0 degrees (ISS arc)
assert "55.0" in info.passes_text
@pytest.mark.asyncio
async def test_demo_tracking_drives_device():
"""DemoCraftClient tracking should move the demo device."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
# Track the Moon — always above horizon in demo
panel.post_message(
CraftPanel.TrackRequested(
target_type="celestial",
target_id="moon",
name="Moon",
min_el=5.0,
)
)
await asyncio.sleep(2.5)
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
assert status.state == "TRACKING"
assert status.moves >= 1
# Device should have been commanded toward the Moon's position
assert app.device._target_az > 100.0 # Moon is around AZ 145
assert app.device._target_el > 20.0 # Moon is around EL 32
# Screenshot for visual verification
app.save_screenshot("/tmp/birdcage_demo_craft_tracking.svg")
# Stop tracking
control = app.query_one("#control", ControlScreen)
control._stop_craft_tracking()
await pilot.pause()
@pytest.mark.asyncio
async def test_demo_tracking_waiting_below_horizon():
"""Tracking a below-horizon LEO target shows WAITING."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await _switch_to_craft(pilot, app)
panel = app.query_one("#ctrl-craft-panel", CraftPanel)
# SO-50 may be below horizon depending on timing.
# Use Jupiter at min_el=90 to guarantee WAITING.
panel.post_message(
CraftPanel.TrackRequested(
target_type="celestial",
target_id="jupiter",
name="Jupiter",
min_el=90.0, # Impossible — forces WAITING
)
)
await asyncio.sleep(2.0)
status = app.query_one("#craft-tracking-status", CraftTrackingStatus)
assert status.state == "WAITING"
# Cleanup
control = app.query_one("#control", ControlScreen)
control._stop_craft_tracking()
await pilot.pause()